PHP code example of martinhej / robocloud

1. Go to this page and download the library: Download martinhej/robocloud library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

martinhej / robocloud example snippets



use robocloud\Event\KinesisProducerErrorConsoleLogger;
use robocloud\Kinesis\Client\Producer;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;

// Define the Kinesis stream name.
$stream_name = 'robocloud';

// Get an event dispatcher instance.
$event_dispatcher = new EventDispatcher();

// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));

// Add error processors. A few simple ones that passivly log errors
// are available in robocloud/Event. To provide more robust error
// processing like requeuing failed messages you need to provide
// your own.
$event_dispatcher->addSubscriber(new KinesisProducerErrorConsoleLogger());

// Now get the message facory that will create and validate messages
// for you.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);

// Use the message factory to set the message data.
$message_factory->setMessageData([
    'version' => 'v_0_1',
    'roboId' => 'lost',
    'purpose' => 'buddy.find',
    'data' => [
        'reason' => 'line_follower.line.lost',
    ],
]);

// Create the actual message that will be sent to Kinesis.
// This will throw exception if message data validation
// fails or if schema files could not be found.
$message = $message_factory->createMessage();

// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');

// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();

// Create the Producer instance.
$producer = new Producer(
    $kinesis_factory->getKinesisClient('AKIAJG2QTSBDKBFNACDA', 'Pg2c2AzMfY/5koj6b0IO3GgOvgF/m5nUDayjBOh/'),
    $stream_name,
    $message_factory,
    $event_dispatcher,
    $cache
);

// Add the message and push it to the stream.
$producer->add($message);
var_dump(array_map(function($result) {
    return (string) $result;
}, $producer->pushAll()));



// Define the Kinesis stream name.
use robocloud\Event\KinesisConsumerErrorConsoleLogger;
use robocloud\Kinesis\Client\Consumer;
use robocloud\Kinesis\Client\ConsumerRecovery;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use robocloud\MessageProcessing\Backend\KeepInMemoryBackend;
use robocloud\MessageProcessing\Filter\KeepAllFilter;
use robocloud\MessageProcessing\Processor\DefaultProcessor;
use robocloud\MessageProcessing\Transformer\KeepOriginalTransformer;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;

// Define the Kinesis stream name.
$stream_name = 'robocloud';

// Create event dispatcher instance.
$event_dispatcher = new EventDispatcher();

// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));

// Add error handler(s).
$event_dispatcher->addSubscriber(new KinesisConsumerErrorConsoleLogger());

// Create filter instance that will be used to filter out only those messages
// that you are interested in.
$filter = new KeepAllFilter();
// The transformer layer is responsible for extracting and processing the
// message data into a form that is expected by your backend.
$keep_original_transformer = new KeepOriginalTransformer();
// Finally provide your backend that will finish the message processing.
$keep_in_memory_backend = new KeepInMemoryBackend();

// Add the message processor as the subscriber that will be used
// during consuming to process the messages.
$event_dispatcher->addSubscriber(new DefaultProcessor($filter, $keep_original_transformer, $keep_in_memory_backend));

// Get the message factory that will be used for creating the message objects
// from the data pulled from Kinesis.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);

// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');

// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();

// Provide the recovery object used to store last read position.
$consumer_recovery = new ConsumerRecovery($stream_name, 'Shard-000001', '/tmp/consumer_recovery.rec');

// Instantiate the consumer and consume messages from Kinesis stream.
$consumer = new Consumer(
    $kinesis_factory->getKinesisClient('AKIAINK5P33X2KBK2RAQ', 'EuUdvE7WW0SKaEpGWMWHvN5M+gIjGaoLAVTYzzhV'),
    $message_factory,
    $event_dispatcher,
    $cache,
    $consumer_recovery
);

// One process
$consumer->consume(0);

// Print the messages to see what we pulled from the stream.
var_dump($keep_in_memory_backend->flush());

// Note that this example is very trivial not providing any real functionality.
// To get better idea on how to use message processor see other filter,
// transformer and backend classes in the MessageProcessing namespace.