1. Go to this page and download the library: Download arekxv/php-data-streamer library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
arekxv / php-data-streamer example snippets
use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\CallableHandler;
use ArekX\DataStreamer\Data\CallableParser;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamReader;
// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
'host' => '127.0.0.1'
]);
// Specify a message converter from array into instances
// This can be one callable via setDefaultBuilder or per type in setBuilder
$parser = new CallableParser();
$parser->setDefaultBuilder(fn(string $id, string $type, array $payload) => ArrayMessage::create($type, $payload, $id));
// Set handler for messages this can be a default one for all messages
// or a custom one per Message::getType()
$handler = new CallableHandler();
$handler->setDefaultHandler(function (ArrayMessage $message) {
echo "{$message->getId()}: " . json_encode($message->getPayload()) . PHP_EOL;
return true; // Returning true means that message was handled successfully.
});
// Settings object which holds the configuration for the stream.
$settings = new Settings([
'stream' => 'data-stream',
'consumerGroup' => 'my-consumer-group',
'consumerName' => 'my-consumer-consumer',
]);
// Initialize data stream reader.
$reader = new StreamReader($driver, $parser, $handler, $settings);
// Run infinite loop to process messages.
echo "Listening..." . PHP_EOL;
$reader->runLoop();
use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\PayloadMessageConverter;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamWriter;
// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
'host' => '127.0.0.1'
]);
// Settings object which holds the configuration for the stream.
$settings = new Settings([
'stream' => 'data-stream'
]);
// Define a converter which will convert a message into an array
// suitable for sending across the data stream.
$converter = new PayloadMessageConverter();
// Initialize a stream writer
$writer = new StreamWriter($driver, $settings, $converter);
// Send message to the data stream
$writer->write(ArrayMessage::create('test-type', [
'key' => 'value',
'key2' => 'value2'
]));
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.