PHP code example of code-rhapsodie / dataflow-bundle
1. Go to this page and download the library: Download code-rhapsodie/dataflow-bundle library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
code-rhapsodie / dataflow-bundle example snippets
// ...
$streamWriter = new \Port\Writer\StreamMergeWriter();
$builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter));
// ...
namespace CodeRhapsodie\DataflowExemple\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\DataflowExemple\Reader\FileReader;
use CodeRhapsodie\DataflowExemple\Writer\FileWriter;
class MyFirstDataflowType extends AbstractDataflowType
{
private $myReader;
private $myWriter;
public function __construct(FileReader $myReader, FileWriter $myWriter)
{
$this->myReader = $myReader;
$this->myWriter = $myWriter;
}
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myWriter->setDestinationFilePath($options['to-file']);
$builder
->setReader($this->myReader->read($options['from-file']))
->addStep(function ($data) use ($options) {
// TODO : Write your code here...
return $data;
})
->addWriter($this->myWriter)
;
}
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
public function getLabel(): string
{
return 'My First Dataflow';
}
public function getAliases(): iterable
{
return ['mfd'];
}
}
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;
class MyFirstDataflowType extends AbstractDataflowType
{
// ...
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
}
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;
class MyDataflowType extends AbstractDataflowType
{
// ...
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myWriter->setLogger($this->logger);
}
}
namespace CodeRhapsodie\DataflowExemple\Reader;
class FileReader
{
public function read(string $filename): iterable
{
if (!$filename) {
throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
}
if (!$fh = fopen($filename, 'r')) {
throw new \Exception("Unable to open file '".$filename."' for read.");
}
while (false !== ($read = fgets($fh))) {
yield explode('|', trim($read));
}
}
}
$builder->setReader(($this->myReader)())
//[...]
$builder->addStep(function ($item) {
// Titles are changed to all caps before export
$item['title'] = strtoupper($item['title']);
return $item;
});
// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long
// Titles are changed to all caps before export
$item['title'] = strtolower($item['title']);
return $item;
}, 2);
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
return false;
}
return $item;
});
//[...]
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class FileWriter implements WriterInterface
{
private $fh;
/** @var string */
private $path;
public function setDestinationFilePath(string $path) {
$this->path = $path;
}
public function prepare()
{
if (null === $this->path) {
throw new \Exception('Define the destination file name before use');
}
if (!$this->fh = fopen($this->path, 'w')) {
throw new \Exception('Unable to open in write mode the output file.');
}
}
public function write($item)
{
fputcsv($this->fh, $item);
}
public function finish()
{
fclose($this->fh);
}
}
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class ProductWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'product' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your product
}
public function finish()
{
}
}
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class OrderWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'order' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your order
}
public function finish()
{
}
}
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
// Snip add reader and steps
$delegatorWriter = new DelegatorWriter();
$delegatorWriter->addDelegate(new ProductWriter());
$delegatorWriter->addDelegate(new OrderWriter());
$builder->addWriter($delegatorWriter);
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.