PHP code example of myvon / reactphp-rdkafka

1. Go to this page and download the library: Download myvon/reactphp-rdkafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

myvon / reactphp-rdkafka example snippets


use Myvon\Kafka\Configuration;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);

use Myvon\Kafka\Consumer;

$consumer = new Consumer($configuration->consumer());

$stream = $consumer->start(['topic']);

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Consumer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$consumer = new Consumer($configuration->consumer());
$stream = $consumer->start(['topic']);

$stream->on('data', function($data) {
    $topic = $data['topic'];
    $message = $data['payload'];
    
    //... do whatever you want here 
});


$stream->on("error", function(Exception $exception) {
    $errorStr = $exception->getMessage();
    $errorCode = $exception->getCode();
    // handle the error here
});

$consumer->setConsumeTimeout(1000); // 1 second

$consumer->setTimerPeriod(0.1); // 100 ms

$kafkaConsumer = $consumer->getConsumer();

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['topicName']);

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic']);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic'], false);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

$configuration = new Configuration('appName', ['127.0.0.1:9092'], ['enable.partition.eof' => 'false']);

$loop = new AnotherLoopInstance();
$producer = new Producer($configuration->producer(), $loop);
$consumer = new Consumer($configuration->consumer(), $loop);