1. Go to this page and download the library: Download martinhej/robocloud library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
martinhej / robocloud example snippets
use robocloud\Event\KinesisProducerErrorConsoleLogger;
use robocloud\Kinesis\Client\Producer;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;
// Define the Kinesis stream name.
$stream_name = 'robocloud';
// Get an event dispatcher instance.
$event_dispatcher = new EventDispatcher();
// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));
// Add error processors. A few simple ones that passivly log errors
// are available in robocloud/Event. To provide more robust error
// processing like requeuing failed messages you need to provide
// your own.
$event_dispatcher->addSubscriber(new KinesisProducerErrorConsoleLogger());
// Now get the message facory that will create and validate messages
// for you.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);
// Use the message factory to set the message data.
$message_factory->setMessageData([
'version' => 'v_0_1',
'roboId' => 'lost',
'purpose' => 'buddy.find',
'data' => [
'reason' => 'line_follower.line.lost',
],
]);
// Create the actual message that will be sent to Kinesis.
// This will throw exception if message data validation
// fails or if schema files could not be found.
$message = $message_factory->createMessage();
// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');
// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();
// Create the Producer instance.
$producer = new Producer(
$kinesis_factory->getKinesisClient('AKIAJG2QTSBDKBFNACDA', 'Pg2c2AzMfY/5koj6b0IO3GgOvgF/m5nUDayjBOh/'),
$stream_name,
$message_factory,
$event_dispatcher,
$cache
);
// Add the message and push it to the stream.
$producer->add($message);
var_dump(array_map(function($result) {
return (string) $result;
}, $producer->pushAll()));
// Define the Kinesis stream name.
use robocloud\Event\KinesisConsumerErrorConsoleLogger;
use robocloud\Kinesis\Client\Consumer;
use robocloud\Kinesis\Client\ConsumerRecovery;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use robocloud\MessageProcessing\Backend\KeepInMemoryBackend;
use robocloud\MessageProcessing\Filter\KeepAllFilter;
use robocloud\MessageProcessing\Processor\DefaultProcessor;
use robocloud\MessageProcessing\Transformer\KeepOriginalTransformer;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;
// Define the Kinesis stream name.
$stream_name = 'robocloud';
// Create event dispatcher instance.
$event_dispatcher = new EventDispatcher();
// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));
// Add error handler(s).
$event_dispatcher->addSubscriber(new KinesisConsumerErrorConsoleLogger());
// Create filter instance that will be used to filter out only those messages
// that you are interested in.
$filter = new KeepAllFilter();
// The transformer layer is responsible for extracting and processing the
// message data into a form that is expected by your backend.
$keep_original_transformer = new KeepOriginalTransformer();
// Finally provide your backend that will finish the message processing.
$keep_in_memory_backend = new KeepInMemoryBackend();
// Add the message processor as the subscriber that will be used
// during consuming to process the messages.
$event_dispatcher->addSubscriber(new DefaultProcessor($filter, $keep_original_transformer, $keep_in_memory_backend));
// Get the message factory that will be used for creating the message objects
// from the data pulled from Kinesis.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);
// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');
// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();
// Provide the recovery object used to store last read position.
$consumer_recovery = new ConsumerRecovery($stream_name, 'Shard-000001', '/tmp/consumer_recovery.rec');
// Instantiate the consumer and consume messages from Kinesis stream.
$consumer = new Consumer(
$kinesis_factory->getKinesisClient('AKIAINK5P33X2KBK2RAQ', 'EuUdvE7WW0SKaEpGWMWHvN5M+gIjGaoLAVTYzzhV'),
$message_factory,
$event_dispatcher,
$cache,
$consumer_recovery
);
// One process
$consumer->consume(0);
// Print the messages to see what we pulled from the stream.
var_dump($keep_in_memory_backend->flush());
// Note that this example is very trivial not providing any real functionality.
// To get better idea on how to use message processor see other filter,
// transformer and backend classes in the MessageProcessing namespace.
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.