PHP code example of code-rhapsodie / dataflow-bundle

1. Go to this page and download the library: Download code-rhapsodie/dataflow-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

code-rhapsodie / dataflow-bundle example snippets



// ...
$streamWriter = new \Port\Writer\StreamMergeWriter();

$builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter));
// ...



return [
     // ...
    CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
    // ...
];


// app/AppKernel.php

public function registerBundles()
{
    $bundles = [
        // ...
        new CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle(),
        // ...
    ];
}


namespace CodeRhapsodie\DataflowExemple\DataflowType;

use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\DataflowExemple\Reader\FileReader;
use CodeRhapsodie\DataflowExemple\Writer\FileWriter;

class MyFirstDataflowType extends AbstractDataflowType
{
    private $myReader;

    private $myWriter;

    public function __construct(FileReader $myReader, FileWriter $myWriter)
    {
        $this->myReader = $myReader;
        $this->myWriter = $myWriter;
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $this->myWriter->setDestinationFilePath($options['to-file']);

        $builder
            ->setReader($this->myReader->read($options['from-file']))
            ->addStep(function ($data) use ($options) {
                // TODO : Write your code here...
                return $data;
            })
            ->addWriter($this->myWriter)
        ;
    }

    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
        $optionsResolver->setRequired('from-file');
    }

    public function getLabel(): string
    {
        return 'My First Dataflow';
    }

    public function getAliases(): iterable
    {
        return ['mfd'];
    }
}



// ...
use Symfony\Component\OptionsResolver\OptionsResolver;

class MyFirstDataflowType extends AbstractDataflowType
{
    // ...
    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
        $optionsResolver->setRequired('from-file');
    }

}


// ...
use Symfony\Component\OptionsResolver\OptionsResolver;

class MyDataflowType extends AbstractDataflowType
{
    // ...
    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $this->myWriter->setLogger($this->logger);
    }

}



namespace CodeRhapsodie\DataflowExemple\Reader;

class FileReader
{
    public function read(string $filename): iterable
    {
        if (!$filename) {
            throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
        }

        if (!$fh = fopen($filename, 'r')) {
            throw new \Exception("Unable to open file '".$filename."' for read.");
        }

        while (false !== ($read = fgets($fh))) {
            yield explode('|', trim($read));
        }
    }
}

$builder->setReader(($this->myReader)())


//[...]
$builder->addStep(function ($item) {
    // Titles are changed to all caps before export
    $item['title'] = strtoupper($item['title']);

    return $item;
});

// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
    yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long

    // Titles are changed to all caps before export
    $item['title'] = strtolower($item['title']);

    return $item;
}, 2);

$builder->addStep(function ($item) {
    // Private items are not exported
    if ($item['private']) {
        return false;
    }

    return $item;
});
//[...]

$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));


namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class FileWriter implements WriterInterface
{
    private $fh;

    /** @var string */
    private $path;

    public function setDestinationFilePath(string $path) {
        $this->path = $path;
    }

    public function prepare()
    {
        if (null === $this->path) {
            throw new \Exception('Define the destination file name before use');
        }
        if (!$this->fh = fopen($this->path, 'w')) {
            throw new \Exception('Unable to open in write mode the output file.');
        }
    }

    public function write($item)
    {
        fputcsv($this->fh, $item);
    }

    public function finish()
    {
        fclose($this->fh);
    }
}

$builder->addWriter(new CollectionWriter($mySingleItemWriter));


namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class ProductWriter implements DelegateWriterInterface
{
    public function supports($item): bool
    {
        return 'product' === reset($item);
    }

    public function prepare()
    {
    }

    public function write($item)
    {
        // Process your product
    }

    public function finish()
    {
    }
}


namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class OrderWriter implements DelegateWriterInterface
{
    public function supports($item): bool
    {
        return 'order' === reset($item);
    }

    public function prepare()
    {
    }

    public function write($item)
    {
        // Process your order
    }

    public function finish()
    {
    }
}

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        // Snip add reader and steps

        $delegatorWriter = new DelegatorWriter();
        $delegatorWriter->addDelegate(new ProductWriter());
        $delegatorWriter->addDelegate(new OrderWriter());

        $builder->addWriter($delegatorWriter);
    }