PHP code example of tobento / service-read-write

1. Go to this page and download the library: Download tobento/service-read-write library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

tobento / service-read-write example snippets


use Tobento\Service\ReadWrite\Modifier;
use Tobento\Service\ReadWrite\Processor;
use Tobento\Service\ReadWrite\Reader;
use Tobento\Service\ReadWrite\Writer;

$reader = new Reader\CsvStream(
    // PSR-7 StreamInterface
    stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'),
);

$writer = new Writer\CsvResource(
    resource: new Writer\Resource\LocalFile('/data/output.csv'),
);

$modifiers = new Modifier\Modifiers(
    new Modifier\ColumnMap(['title' => 'headline']),
);

$processor = new Processor\TimeBudgetProcessor(
    timeBudget: 10,
    modifiers: $modifiers
);

$result = $processor->process(reader: $reader, writer: $writer);

print_r($result->timeline());

use Nyholm\Psr7\Factory\Psr17Factory;
use Tobento\Service\ReadWrite\Reader;

// Create a PSR-7 stream from a local file
$stream = new Psr17Factory()->createStreamFromFile('/data/input.csv');

// Initialize the CSV reader
$reader = new Reader\CsvStream(
    stream: $stream,
    delimiter: ',', // optional, defaults to ','
    enclosure: '"', // optional, defaults to '"'
    escape: '\\', // optional, defaults to '\'
);

// Read the first 5 rows starting at offset 0
foreach ($reader->read(offset: 0, limit: 5) as $row) {
    echo $row->key() . ': ' . json_encode($row->all()) . PHP_EOL;
}

if ($reader->isFinished()) {
    echo 'Reached end of CSV at offset: ' . $reader->currentOffset();
}

use Tobento\Service\ReadWrite\Reader\IterableReader;

$data = [
    ['title' => 'Hello'],
    ['title' => 'World'],
];

$reader = new IterableReader($data);

foreach ($reader->read() as $row) {
    echo $row->key() . ': ' . $row->get('title') . PHP_EOL;
}

echo 'Finished? ' . ($reader->isFinished() ? 'yes' : 'no') . PHP_EOL;
echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL;

use Nyholm\Psr7\Factory\Psr17Factory;
use Tobento\Service\ReadWrite\Reader\JsonStream;

$stream = new Psr17Factory()->createStreamFromFile('/data/input.json');
$reader = new JsonStream($stream);

// Read the first 3 rows
foreach ($reader->read(offset: 0, limit: 3) as $row) {
    if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) {
        echo "Skipped row: " . $row->reason() . PHP_EOL;
    } else {
        print_r($row->all());
    }
}

echo 'Offset: ' . $reader->currentOffset() . PHP_EOL;

if ($reader->isFinished()) {
    echo 'Reached end of JSON stream';
}

use Nyholm\Psr7\Factory\Psr17Factory;
use Tobento\Service\ReadWrite\Reader\NdJsonStream;

$stream = new Psr17Factory()->createStreamFromFile('/data/input.ndjson');
$reader = new NdJsonStream(stream: $stream);

// Read the first 2 rows starting at offset 0
foreach ($reader->read(offset: 0, limit: 2) as $row) {
    if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) {
        echo 'Skipped row: ' . $row->reason() . PHP_EOL;
    } else {
        echo json_encode($row->all()) . PHP_EOL;
    }
}

echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL;

if ($reader->isFinished()) {
    echo 'Reached end of NDJSON stream';
}

use Tobento\Service\ReadWrite\Reader\RepositoryReader;
use Tobento\Service\Repository\ReadRepositoryInterface;

// Create reader with a query
$reader = new RepositoryReader(
    // ReadRepositoryInterface instance used as the data source
    repository: $repository,
    
    // Filtering conditions applied before reading
    where: [],
    
    // Sorting rules
    orderBy: [],
    
    // Optional callable to convert objects into arrays
    objectToArray: function (object $entity): array {
        // Custom conversion logic for domain objects
        return [
            'id'   => $entity->id(),
            'name' => $entity->name(),
            'role' => $entity->role(),
        ];
    },
    
    // Number of rows sampled for columnsPreview()
    previewRows: 3,
);

// Read the first 2 rows
foreach ($reader->read(offset: 0, limit: 2) as $row) {
    if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) {
        echo 'Skipped row: ' . $row->reason() . PHP_EOL;
    } else {
        print_r($row->all());
    }
}

echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL;

if ($reader->isFinished()) {
    echo 'Reached end of active users';
}

use Tobento\Service\ReadWrite\Reader\StorageReader;
use Tobento\Service\Storage\StorageInterface;

// Create reader with a query
$reader = new StorageReader(
    // StorageInterface instance used as the data source
    storage: $storage,
    
    // Table name to read from
    table: 'products',
    
    // Optional query callable applied before reading
    query: function (StorageInterface $t): void {
        $t->where('price', '>', 10)->order('price', 'asc');
    },
    
    // Number of rows sampled for columnsPreview()
    previewRows: 3,
);

// Read the first 2 rows
foreach ($reader->read(offset: 0, limit: 2) as $row) {
    if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) {
        echo 'Skipped row: ' . $row->reason() . PHP_EOL;
    } else {
        print_r($row->all());
    }
}

echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL;

if ($reader->isFinished()) {
    echo 'Reached end of filtered products';
}

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\CsvResource;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

// Create a file resource
$resource = new LocalFile('/data/output.csv');

// Initialize the CSV writer
$writer = new CsvResource(
    resource: $resource,
    delimiter: ',', // optional
    enclosure: '"', // optional
    escape: '\\',  // optional
    writeBom: true // optional
);

// Set mode (overwrite or append)
$writer->mode(Mode::Overwrite);

// Start writing
$writer->start();

// Write rows
$writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft']));
$writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published']));

// Finish writing
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\PdfResource;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;
use Tobento\Service\View\ViewInterface;

// Create a file resource
$resource = new LocalFile('/data/report.html');

// Initialize the HTML writer with a template and template data
$writer = new HtmlResource(
    resource: $resource,
    view: $view, // ViewInterface see view service.
    templateName: 'html/export-table',
    templateData: [
        'title' => 'Product Report',
        'generated_at' => date('Y-m-d'),
    ],
);

// Start writing
$writer->start();

// Write rows (these will be available as $rows in the template)
$writer->write(new Row(key: 1, attributes: ['name' => 'Apple', 'price' => 2.50]));
$writer->write(new Row(key: 2, attributes: ['name' => 'Banana', 'price' => 1.20]));

// Finish writing and generate the HTML
$writer->finish();

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title><?= $view->esc($title ?? 'Export') 

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\JsonResource;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

// Create a file resource
$resource = new LocalFile('/data/output.json');

// Initialize the JSON writer
$writer = new JsonResource($resource);

// Set mode (overwrite or append)
$writer->mode(Mode::Overwrite);

// Start writing
$writer->start();

// Write rows
$writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft']));
$writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published']));

// Finish writing
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\NdJsonResource;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

// Create a file resource
$resource = new LocalFile('/data/output.ndjson');

// Initialize the NDJSON writer
$writer = new NdJsonResource($resource);

// Start writing
$writer->start();

// Write rows (each will be a separate line)
$writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft']));
$writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published']));

// Finish writing
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\NullWriter;

// Initialize the NullWriter
$writer = new NullWriter();

// (Optional) Set mode if your application uses writer modes
$writer->mode(Mode::Overwrite);

// Start writing (no-op)
$writer->start();

// Write rows (no-op)
$writer->write(new Row(
    key: 1,
    attributes: ['title' => 'Hello', 'status' => 'Draft']
));

$writer->write(new Row(
    key: 2,
    attributes: ['title' => 'World', 'status' => 'Published']
));

// Finish writing (no-op)
$writer->finish();

use Tobento\Service\Pdf\Enums\Orientation;
use Tobento\Service\Pdf\Pdf;
use Tobento\Service\Pdf\PdfGenerator;
use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\PdfResource;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

// Create a file resource
$resource = new LocalFile('/data/report.pdf');

// Create a PDF generator: see pdf service.
$pdfGenerator = new PdfGenerator();

// Initialize the PDF writer with a template and template data
$writer = new PdfResource(
    resource: $resource,
    pdfGenerator: $pdfGenerator,
    templateName: 'pdf/export-table',
    templateData: [
        'title' => 'Product Report',
        'description' => 'Generated on ' . date('Y-m-d'),
    ],
    pdf: new Pdf()->orientation(Orientation::LANDSCAPE),
);

// Start writing
$writer->start();

// Write rows (these will be available as $rows in the template)
$writer->write(new Row(key: 1, attributes: ['name' => 'Apple', 'price' => 2.50]));
$writer->write(new Row(key: 2, attributes: ['name' => 'Banana', 'price' => 1.20]));

// Finish writing and generate the PDF
$writer->finish();

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title><?= $view->esc($title ?? 'Export') 

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\RepositoryWriter;
use Tobento\Service\Repository\WriteRepositoryInterface;

// Example repository implementing WriteRepositoryInterface
$repository = new MyRepository();

// Initialize the repository writer
$writer = new RepositoryWriter(
    repository: $repository,
    
    // Optional: specify the identifier column (defaults to 'id')
    idName: 'id',
    
    // Optional: define the columns supported by the repository
    columns: ['title', 'status', 'created_at'],
    
    // Optional: provide representative preview values for each column
    columnsPreview: [
        'title'  => 'Lorem',
        'status' => 'Draft | Pending',
    ],
);

// Start writing
$writer->start();

// Write rows
$writer->write(new Row(key: 1, attributes: ['id' => 1, 'title' => 'Hello', 'status' => 'Draft']));
$writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published']));

// Finish writing
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\RowInterface;
use Tobento\Service\ReadWrite\Writer\RepositoryWriter;
use Tobento\Service\Repository\WriteRepositoryInterface;

// Custom writer callback
$customWriter = function(RowInterface $row, WriteRepositoryInterface $repo): void {
    $attributes = $row->all();
    // Always create new entries, ignore id
    $repo->create($attributes);
};

$writer = new RepositoryWriter(
    repository: $repository,
    writer: $customWriter
);

$writer->start();
$writer->write(new Row(key: 1, attributes: ['id' => 99, 'title' => 'Force Create']));
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\StorageWriter;
use Tobento\Service\Storage\StorageInterface;

// Example storage implementing StorageInterface
$storage = new MyStorage();

// Initialize the storage writer
$writer = new StorageWriter(
    storage: $storage,
    
    // Optional: specify the identifier column (defaults to 'id')
    idName: 'id',
    
    // Optional: define the columns supported by the repository
    columns: ['title', 'status', 'created_at'],
    
    // Optional: provide representative preview values for each column
    columnsPreview: [
        'title'  => 'Lorem',
        'status' => 'Draft | Pending',
    ],
);

// Start writing
$writer->start();

// Write rows
$writer->write(new Row(key: 1, attributes: ['id' => 1, 'title' => 'Hello', 'status' => 'Draft']));
$writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published']));

// Finish writing
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\RowInterface;
use Tobento\Service\ReadWrite\Writer\StorageWriter;
use Tobento\Service\Storage\StorageInterface;

// Custom writer callback
$customWriter = function(RowInterface $row, StorageInterface $storage): void {
    $attributes = $row->all();
    // Always create new entries, ignore id
    $storage->insert($attributes);
};

$writer = new StorageWriter(
    storage: $storage,
    writer: $customWriter
);

$writer->start();
$writer->write(new Row(key: 1, attributes: ['id' => 99, 'title' => 'Force Create']));
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\XmlResource;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

// Create a file resource
$resource = new LocalFile('/data/feed.xml');

// Initialize the XML writer
$writer = new XmlResource(
    resource: $resource,
    rootElement: 'products',
    rowElement: 'product',
    rowWrapper: null, // optional
    rootAttributes: ['xmlns:g' => 'http://base.google.com/ns/1.0'], // optional
    xmlVersion: '1.0', // optional
    encoding: 'UTF-8' // optional
);

// Set mode (overwrite or finalize)
$writer->mode(Mode::Overwrite);

// Start writing
$writer->start();

// Write rows
$writer->write(new Row(key: 1, attributes: [
    '@id' => '123',
    'title' => 'Red Shoes',
    'price' => '49.99',
    'tags' => [
        'tag' => ['fashion', 'shoes'], // repeated elements
    ],
]));

$writer->write(new Row(key: 2, attributes: [
    '@id' => '124',
    'title' => 'Blue Shirt',
    'price' => '29.99',
]));

// Finish writing
$writer->mode(Mode::Finalize);
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\XmlResource;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

$resource = new LocalFile('/data/atom.xml');

$writer = new XmlResource(
    resource: $resource,
    rootElement: 'feed',
    rowElement: 'entry',
    rowWrapper: null,
    rootAttributes: [
        'xmlns' => 'http://www.w3.org/2005/Atom',
    ]
);

$writer->mode(Mode::Overwrite);
$writer->start();

$writer->write(new Row(1, [
    'title' => 'Hello World',
    'id' => 'urn:uuid:123',
    'updated' => '2025-01-01T12:00:00Z',
    'link' => [
        '@href' => 'https://example.com/hello',
    ],
    'content' => 'This is an Atom entry.',
]));

$writer->mode(Mode::Finalize);
$writer->finish();

use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\Writer\XmlResource;
use Tobento\Service\ReadWrite\Writer\Mode;
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;

$resource = new LocalFile('/data/google-shopping.xml');

$writer = new XmlResource(
    resource: $resource,
    rootElement: 'rss',
    rowElement: 'item',
    rowWrapper: 'channel',
    rootAttributes: [
        'version' => '2.0',
        'xmlns:g' => 'http://base.google.com/ns/1.0',
    ]
);

$writer->mode(Mode::Overwrite);
$writer->start();

$writer->write(new Row(1, [
    'g:id' => 'SKU123',
    'g:title' => 'Red Shoes',
    'g:description' => 'Comfortable running shoes.',
    'g:link' => 'https://example.com/red-shoes',
    'g:image_link' => 'https://example.com/red-shoes.jpg',
    'g:price' => '49.99 USD',
    'g:availability' => 'in stock',
]));

$writer->mode(Mode::Finalize);
$writer->finish();

use Tobento\Service\FileStorage\NullStorage;
use Tobento\Service\FileStorage\StorageInterface;
use Tobento\Service\ReadWrite\Exception\WriterException;
use Tobento\Service\ReadWrite\Writer\Resource\FileStorage;

// Create a storage (NullStorage for demo)
$storage = new NullStorage(name: 'null');

// Initialize the FileStorage resource
$resource = new FileStorage(storage: $storage, filename: 'output.csv');

// Open resource
$resource->open();

// Write data
$resource->write("id,title,status\n");
$resource->write("1,Hello,Draft\n");
$resource->write("2,World,Published\n");

// Rewind if needed
$resource->rewind();

// Close and commit to storage
$resource->close();

use Tobento\Service\ReadWrite\Writer\Resource\InMemory;
use Tobento\Service\ReadWrite\Exception\WriterException;

// Initialize the in-memory resource
$resource = new InMemory();

// Open resource
$resource->open();

// Write data
$resource->write("id,title,status\n");
$resource->write("1,Hello,Draft\n");
$resource->write("2,World,Published\n");

// Rewind if needed
$resource->rewind();

// Close resource
$resource->close();

// Retrieve buffered content
echo $resource->getContent();

use Tobento\Service\ReadWrite\Writer\Resource\LocalFile;
use Tobento\Service\ReadWrite\Exception\WriterException;

// Initialize the LocalFile resource
$resource = new LocalFile(filename: '/data/output.csv', mode: 'w');

// Open resource
$resource->open();

// Write data
$resource->write("id,title,status\n");
$resource->write("1,Hello,Draft\n");
$resource->write("2,World,Published\n");

// Rewind if needed
$resource->rewind();

// Close resource
$resource->close();

use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf;

// Always apply:
new ApplyModifiersIf(true, $modifier);

// Never apply:
new ApplyModifiersIf(false, $modifier);

use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf;

new ApplyModifiersIf('country', $modifier);

use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf;

// Apply if field is missing:
new ApplyModifiersIf(
    ['field' => 'postal_code', 'missing' => true],
    $modifier
);

// Apply if field equals a specific value:
new ApplyModifiersIf(
    ['field' => 'country', 'equals' => 'CH'],
    $modifier
);

use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf;
use Tobento\Service\ReadWrite\RowInterface;

new ApplyModifiersIf(
    fn(RowInterface $row): bool => $row->get('country') === 'CH',
    $modifier
);

use Tobento\Service\ReadWrite\Modifier\CallableModifier;
use Tobento\Service\ReadWrite\ReaderInterface;
use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\ReadWrite\RowInterface;
use Tobento\Service\ReadWrite\WriterInterface;

// Define a custom modifier
$modifier = new CallableModifier(function(RowInterface $row, ReaderInterface $reader, WriterInterface $writer): RowInterface {
    $attributes = $row->all();
    $attributes['title'] = strtoupper($attributes['title'] ?? '');
    
    return new Row(
        key: $row->key(),
        attributes: $attributes
    );
});

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

use Tobento\Service\ReadWrite\Modifier\ColumnMap;
use Tobento\Service\ReadWrite\Row\Row;

// Define a column mapping
$modifier = new ColumnMap([
    'title' => 'name',
    'status' => 'state',
]);

// Example row
$row = new Row(
    key: 1,
    attributes: [
        'title' => 'Hello',
        'status' => 'Draft',
        'ignored' => 'Not mapped',
    ]
);

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

// Resulting row attributes:
// ['name' => 'Hello', 'state' => 'Draft']

use Tobento\Service\ReadWrite\Modifier\CombineFields;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new CombineFields(
    fields: ['first_name', 'last_name', 'age'],
    into: 'summary',
    separator: ' ',
    removeSourceFields: true
);

$row = new Row(1, [
    'first_name' => 'John',
    'last_name' => 'Doe',
    'age' => 42,
]);

$modified = $modifier->modify($row, $reader, $writer);

// Result:
// ['summary' => 'John Doe 42']

use Tobento\Service\ReadWrite\Modifier\Compute;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Compute(
    field: 'full_name',
    computeFn: function (RowInterface $row) {
        return trim(
            $row->get('first_name', '') . ' ' .
            $row->get('last_name', '')
        );
    }
);

$row = new Row(
    key: 1,
    attributes: [
        'first_name' => 'John',
        'last_name'  => 'Doe',
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//     'first_name' => 'John',
//     'last_name'  => 'Doe',
//     'full_name'  => 'John Doe',
// ]

use Tobento\Service\ReadWrite\Modifier\DefaultValue;
use Tobento\Service\ReadWrite\Row\Row;

// Define default values
$modifier = new DefaultValue([
    'status' => 'Draft',
    'created_at' => date('Y-m-d'),
]);

// Example row with missing and empty attributes
$row = new Row(
    key: 1,
    attributes: [
        'title' => 'Hello',
        'status' => '',
    ]
);

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

// Resulting row attributes:
// [
//   'title' => 'Hello',
//   'status' => 'Draft', // replaced empty string
//   'created_at' => '2025-12-24',
// ]

use Tobento\Service\Encryption\EncrypterInterface;
use Tobento\Service\ReadWrite\Modifier\Encrypt;
use Tobento\Service\ReadWrite\Row\Row;

// $encrypter is an instance of EncrypterInterface
$modifier = new Encrypt(
    fields: ['api.key', 'api.secret'],
    encrypter: $encrypter,
);

$row = new Row(
    key: 1,
    attributes: [
        'api' => [
            'key' => 'my-api-key',
            'secret' => 'super-secret',
        ],
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//   'api' => [
//     'key' => 'ENCRYPTED',
//     'secret' => 'ENCRYPTED',
//   ],
// ]

use Tobento\Service\ReadWrite\Modifier\FilterFields;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new FilterFields(['id', 'email']);

$row = new Row(
    key: 1,
    attributes: [
        'id' => 1,
        'email' => '[email protected]',
        'password' => 'secret',
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//     'id' => 1,
//     'email' => '[email protected]',
// ]

use Tobento\Service\ReadWrite\Modifier\Format;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Format(
    field: 'email',
    formatter: function ($value, RowInterface $row) {
        return strtolower(trim((string)$value));
    }
);

$row = new Row(
    key: 1,
    attributes: [
        'email' => '  [email protected]  ',
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//     'email' => '[email protected]',
// ]

use Tobento\Service\ReadWrite\Modifier\Hash;
use Tobento\Service\ReadWrite\Row\Row;

// Hash a password using PHP's password_hash
$modifier = new Hash(
    fields: 'password',
    hasher: fn($value) => password_hash($value, PASSWORD_DEFAULT),
);

$row = new Row(
    key: 1,
    attributes: ['password' => 'secret123']
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//   'password' => '$2y$10$....' // hashed value
// ]

use Tobento\Service\ReadWrite\Modifier\Lookup;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Lookup(
    field: 'category',
    into: 'category_id',
    lookup: [
        'Books' => 1,
        42 => 'Answer',
        3.14 => 'Pi',
    ],
    removeSourceField: true
);

$row = new Row(1, ['category' => 42]);

$modified = $modifier->modify($row, $reader, $writer);

// Result:
// ['category_id' => 'Answer']

$modifier = new Lookup(
    field: 'category',
    into: 'category_id',
    lookup: fn($value) => CategoryRepository::findIdByName($value),
    removeSourceField: true
);

$row = new Row(1, ['category' => 'Books']);
$modified = $modifier->modify($row, $reader, $writer);

// Result:
// ['category_id' => 1]

use Tobento\Service\ReadWrite\Modifier\Mask;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Mask(
    fields: ['user.email', 'user.phone']
);

$row = new Row(
    key: 1,
    attributes: [
        'user' => [
            'email' => '[email protected]',
            'phone' => '1234567890',
        ],
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//   'user' => [
//     'email' => 'j***@example.com',
//     'phone' => '1********0',
//   ],
// ]

use Tobento\Service\ReadWrite\Modifier\Mask;

// Custom masking: always replace the value with six asterisks
$modifier = new Mask(
    fields: 'phone',
    masker: fn(mixed $value): string => '******',
);

use Tobento\Service\ReadWrite\Modifier\Redact;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Redact(
    fields: ['user.password', 'user.token']
);

$row = new Row(
    key: 1,
    attributes: [
        'user' => [
            'password' => 'secret123',
            'token' => 'abc123xyz',
        ],
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//   'user' => [
//     'password' => null,
//     'token' => null,
//   ],
// ]

use Tobento\Service\ReadWrite\Modifier\Redact;

// Replace values with a fixed string instead of null
$modifier = new Redact(
    fields: 'api.key',
    replacement: 'REDACTED',
);

use Tobento\Service\ReadWrite\Modifier\RemoveFields;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new RemoveFields(['password', 'debug']);

$row = new Row(
    key: 1,
    attributes: [
        'id' => 1,
        'email' => '[email protected]',
        'password' => 'secret',
        'debug' => 'x',
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//     'id' => 1,
//     'email' => '[email protected]',
// ]

use Tobento\Service\ReadWrite\Modifier\Replace;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Replace(
    fields: ['status'],
    replacements: [
        'N/A' => null,
        'yes' => true,
        'no'  => false,
    ],
    strict: true,
);

$row = new Row(
    key: 1,
    attributes: [
        'status' => 'yes',
    ]
);

$modified = $modifier->modify($row, $reader, $writer);

// Resulting attributes:
// [
//     'status' => true,
// ]

use Tobento\Service\ReadWrite\Modifier\Replace;

$modifier = new Replace(
    fields: 'comment',
    replacements: [
        'foo' => 'bar',
    ],
    strict: false,
);

use Tobento\Service\ReadWrite\Modifier\Replace;

$modifier = new Replace(
    fields: 'email',
    replacements: [
        null => 'unknown',
    ],
    strict: true,
    forceNullReplacement: true,
);

use Tobento\Service\ReadWrite\Modifier\Sanitize;
use Tobento\Service\Sanitizer\Sanitizer;
use Tobento\Service\ReadWrite\Row\Row;

$sanitizer = new Sanitizer();

// Define sanitation rules
$modifier = new Sanitize([
    'title' => 'strip_tags|trim',
    'published_at' => 'date:Y-m-d:d.m.Y',
], $sanitizer);

$row = new Row(
    key: 1,
    attributes: [
        'title' => '<h1>Hello</h1>   ',
        'published_at' => '24.12.2025',
    ]
);

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

// Resulting row attributes:
// [
//   'title' => 'Hello',
//   'published_at' => '2025-12-24', // normalized
// ]

use Tobento\Service\ReadWrite\Modifier\SkipIf;

// Skip all rows:
new SkipIf(true, 'Skipping all rows');

// Skip no rows:
new SkipIf(false);

use Tobento\Service\ReadWrite\Modifier\SkipIf;

new SkipIf('email', 'Email is 

use Tobento\Service\ReadWrite\Modifier\SkipIf;

// Skip if field is missing or empty:
new SkipIf(
    ['field' => 'username', 'missing' => true],
    'Username is missing'
);

// Skip if field equals a specific value:
new SkipIf(
    ['field' => 'status', 'equals' => 'N/A'],
    'Status is N/A'
);

// Skip if field equals zero:
new SkipIf(
    ['field' => 'age', 'equals' => 0],
    'Age cannot be zero'
);

use Tobento\Service\ReadWrite\Modifier\SkipIf;
use Tobento\Service\ReadWrite\RowInterface;

new SkipIf(
    fn(RowInterface $row): bool => $row->get('age') < 18,
    'User is under 18'
);

use Tobento\Service\ReadWrite\Modifier\Split;
use Tobento\Service\ReadWrite\Row\Row;

$modifier = new Split(
    field: 'full_name',
    into: ['first_name', 'last_name'],
    separator: ' ',
    removeSourceField: true
);

$row = new Row(1, ['full_name' => 'John Doe']);
$modified = $modifier->modify($row, $reader, $writer);

// Result:
// ['first_name' => 'John', 'last_name' => 'Doe']

use Tobento\Service\ReadWrite\Modifier\Trim;
use Tobento\Service\ReadWrite\Row\Row;

// Define which attributes to trim
$modifier = new Trim(['title', 'status']);

// Example row
$row = new Row(
    key: 1,
    attributes: [
        'title' => '  Hello World  ',
        'status' => ' Draft ',
        'description' => '   untouched   ',
    ]
);

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

// Resulting row attributes:
// [
//   'title' => 'Hello World',
//   'status' => 'Draft',
//   'description' => '   untouched   ', // unchanged
// ]

use Tobento\Service\ReadWrite\Modifier\Unique;
use Tobento\Service\ReadWrite\Row\Row;

// Ensure email is unique across the import
$modifier = new Unique(
    fields: 'email',
    lookup: null,      // use in-memory uniqueness
    onFail: 'skip',    // or 'fail'
);

$row1 = new Row(
    key: 1,
    attributes: ['email' => '[email protected]']
);

$row2 = new Row(
    key: 2,
    attributes: ['email' => '[email protected]']
);

// First row passes
$modifier->modify($row1, $reader, $writer);

// Second row is skipped:
$modified = $modifier->modify($row2, $reader, $writer);

// Result:
// SkipRow {
//   key: 2,
//   attributes: ['email' => '[email protected]'],
//   reason: 'Duplicate value for unique field "email": [email protected]'
// }

use Tobento\Service\ReadWrite\Modifier\Unique;
use Tobento\Service\ReadWrite\RowInterface;

$modifier = new Unique(
    fields: 'sku',
    lookup: fn(string $field, mixed $value, RowInterface $row): bool =>
        $productRepository->skuExists($value),
    onFail: 'fail'
);

use Tobento\Service\ReadWrite\Modifier\Validation;
use Tobento\Service\ReadWrite\Row\Row;
use Tobento\Service\Validation\Validator;

$validator = new Validator();

// Define validation rules
$modifier = new Validation(
    rules: [
        'email' => ',
    ]
);

// Normally applied by processors, but can be invoked manually:
$modified = $modifier->modify($row, $reader, $writer);

// Result when validation fails and onFail = 'skip':
// SkipRow {
//   key: 1,
//   attributes: [
//     'email' => 'not-an-email',
//     'age'   => '17',
//     'name'  => 'John Doe',
//   ],
//   reason: 'Validation failed: [error] The email must be a valid email address (email) [error] Must be at least 18 (age)'
// }

// If onFail = 'fail', a ModifyErrorsException is thrown instead.

use Nyholm\Psr7\Factory\Psr17Factory;
use Tobento\Service\ReadWrite\Modifier\Modifiers;
use Tobento\Service\ReadWrite\Processor\Processor;
use Tobento\Service\ReadWrite\Reader;
use Tobento\Service\ReadWrite\Writer;

// Reader and writer
$reader = new Reader\CsvStream(
    stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'),
);
$writer = new Writer\CsvResource(
    resource: new Writer\Resource\LocalFile('/data/output.csv'),
);

// Create modifiers (empty for this example)
$modifiers = new Modifiers();

// Create processor
$processor = new Processor(
    modifiers: $modifiers,
    resultHandler: null,
);

// Process rows starting from the reader's current offset
$result = $processor->process(
    reader: $reader,
    writer: $writer,
    offset: $reader->currentOffset(),
    limit: null,
);

// Inspect result
echo $result->successfulRows(); // e.g. 42

use Nyholm\Psr7\Factory\Psr17Factory;
use Tobento\Service\ReadWrite\Modifier\Modifiers;
use Tobento\Service\ReadWrite\Processor\TimeBudgetProcessor;
use Tobento\Service\ReadWrite\Reader;
use Tobento\Service\ReadWrite\Writer;

// Reader and writer
$reader = new Reader\CsvStream(
    stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'),
);
$writer = new Writer\CsvResource(
    resource: new Writer\Resource\LocalFile('/data/output.csv'),
);

// Create modifiers (empty for this example)
$modifiers = new Modifiers();

// Create processor with a 20-second time budget
$processor = new TimeBudgetProcessor(
    timeBudget: 20,
    modifiers: $modifiers,
    resultHandler: null,
);

// Process rows starting from the reader's current offset
$result = $processor->process(
    reader: $reader,
    writer: $writer,
    offset: $reader->currentOffset(),
    limit: null,
);

// Inspect result
echo $result->successfulRows();

use Psr\Log\LoggerInterface;
use Tobento\Service\ReadWrite\ResultHandlerInterface;
use Tobento\Service\ReadWrite\ResultInterface;
use Tobento\Service\ReadWrite\RowInterface;
use Tobento\Service\ReadWrite\Row\SkippableInterface;
use Throwable;

class LoggingResultHandler implements ResultHandlerInterface
{
    public function __construct(
        private LoggerInterface $logger,
    ) {}

    public function handleRowSuccess(RowInterface $row): void
    {
        $this->logger->info('Row processed successfully', [
            'row' => $row,
        ]);
    }

    public function handleRowSkip(SkippableInterface $row): void
    {
        $this->logger->notice('Row skipped', [
            'row' => $row,
        ]);
    }

    public function handleRowFailure(RowInterface $row, Throwable $exception): void
    {
        $this->logger->error('Row processing failed', [
            'row'       => $row,
            'exception' => $exception,
        ]);
    }

    public function handleResult(ResultInterface $result): void
    {
        $this->logger->info('Processing finished', [
            'successful' => $result->successfulRows(),
            'failed'     => $result->failedRows(),
            'skipped'    => $result->skippedRows(),
            'runtime'    => $result->runtimeInSeconds(),
        ]);
    }
}

$result = $processor->process($reader, $writer);

echo $result->successfulRows();   // e.g. 42
echo $result->failedRows();       // e.g. 3
echo $result->runtimeInSeconds(); // e.g. 1.52

[
    'started_at' => '2025-01-01T12:00:00+00:00',
    'finished_at' => '2025-01-01T12:00:01+00:00',
    'runtime_seconds' => 1.0,
    'rows' => [
        'successful' => 42,
        'failed' => 3,
        'skipped' => 1,
        'total' => 46,
    ],
]

use Psr\EventDispatcher\EventDispatcherInterface;
use Tobento\Service\ReadWrite\Event;
use Tobento\Service\ReadWrite\ProcessorInterface;
use Tobento\Service\ReadWrite\Result;

class ImportRunner
{
    public function __construct(
        private ProcessorInterface $processor,
        private EventDispatcherInterface $events,
    ) {}

    public function run($reader, $writer): void
    {
        // Dispatch: processing started
        $this->events->dispatch(new Event\ProcessStarted(
            result: new Result(
                successfulRows: 0,
                failedRows: 0,
                skippedRows: 0,
                reader: $reader,
                writer: $writer,
                modifiers: $this->processor->modifiers(),
            ),
        ));

        try {
            // Run the processor
            $result = $this->processor->process(
                reader: $reader,
                writer: $writer,
                offset: $reader->currentOffset()
            );

            // Dispatch: partial progress (optional)
            $this->events->dispatch(new Event\PartialProcess($result));

            // Dispatch: processing completed
            if ($reader->isFinished()) {
                $this->events->dispatch(new Event\ProcessCompleted($result));
            }

        } catch (\Throwable $e) {

            // Build a minimal failure result snapshot
            $failureResult = new Result(
                successfulRows: 0,
                failedRows: 0,
                skippedRows: 0,
                reader: $reader,
                writer: $writer,
                modifiers: $this->processor->modifiers(),
                meta: ['failed' => true],
            );

            // Dispatch: processing failed
            $this->events->dispatch(new Event\ProcessFailed(
                result: $failureResult,
                exception: $e,
            ));

            throw $e;
        }
    }
}


use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Tobento\Service\ReadWrite\Event;
use Tobento\Service\ReadWrite\Processor\TimeBudgetProcessor;
use Tobento\Service\ReadWrite\RegistriesInterface;
use Tobento\Service\ReadWrite\Result;
use Tobento\Service\ReadWrite\ResultHandlerInterface;
use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\Parameter;
use Tobento\Service\Queue\QueuesInterface;

class TimeBudgetJobHandler implements JobHandlerInterface
{
    public function __construct(
        protected ContainerInterface $container,
        protected RegistriesInterface $registries,
        protected JobRepositoryInterface $jobRepository,
        protected QueuesInterface $queues,
        protected null|ResultHandlerInterface $resultHandler = null,
        protected null|EventDispatcherInterface $eventDispatcher = null,
    ) {}

    public function handleJob(JobInterface $job): void
    {
        $id = (int)($job->getPayload()['job_id'] ?? 0);

        if (is_null($jobEntity = $this->jobRepository->findById($id))) {
            // Optional: dispatch job-not-found event
            return;
        }
        
        // Resolve reader
        $readerRegistry = $this->registries->get($jobEntity->readerId());
        if (is_null($readerRegistry)) {
            return;
        }
        $reader = $readerRegistry->createReader($this->container, $jobEntity);

        // Resolve writer + modifiers
        $writerRegistry = $this->registries->get($jobEntity->writerId());
        if (is_null($writerRegistry)) {
            return;
        }
        $writer = $writerRegistry->createWriter($this->container, $jobEntity);
        $modifiers = $writerRegistry->createModifiers($this->container, $jobEntity);

        // Job data (offset)
        $data = $job->parameters()->get(Parameter\Data::class)
            ?? new Parameter\Data(['offset' => 0]);

        if (! $job->parameters()->has(Parameter\Data::class)) {
            $job->parameters()->add($data);
        }

        $offset = $data->get('offset', 0);

        // Optional: dispatch start event
        $this->eventDispatcher?->dispatch(new Event\ProcessStarted(
            result: new Result(
                successfulRows: 0,
                failedRows: 0,
                skippedRows: 0,
                reader: $reader,
                writer: $writer,
                modifiers: $modifiers,
            ),
        ));

        try {
            // Run processor with time budget
            $processor = new TimeBudgetProcessor(
                timeBudget: $job->getPayload()['timeBudget'] ?? 20,
                modifiers: $modifiers,
                resultHandler: $this->resultHandler,
            );

            $result = $processor->process(
                reader: $reader,
                writer: $writer,
                offset: $offset,
            );

            $this->eventDispatcher?->dispatch(new Event\PartialProcess($result));

            // Requeue if not finished
            if (! $reader->isFinished()) {
                $data->set('offset', $reader->currentOffset());
                $job->parameters()->add($data);

                $this->queues
                    ->queue($job->parameters()->get(Parameter\Queue::class)->name())
                    ->push($job);

                return;
            }

            // Completed
            $this->eventDispatcher?->dispatch(new Event\ProcessCompleted($result));

        } catch (\Throwable $e) {

            $failureResult = new Result(
                successfulRows: 0,
                failedRows: 0,
                skippedRows: 0,
                reader: $reader,
                writer: $writer,
                modifiers: $modifiers,
                meta: ['failed' => true],
            );

            $this->eventDispatcher?->dispatch(new Event\ProcessFailed(
                result: $failureResult,
                exception: $e,
            ));

            throw $e;
        }
    }
}