PHP code example of beta / kafka.client

1. Go to this page and download the library: Download beta/kafka.client library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.

/* Start to develop here. Best regards */


beta / kafka.client example snippets

use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\TopicConf;
use KafkaClient\Client;

$consumerConfig = new Conf();
$consumerConfig->set('', 'myConsumerGroup'); // устанавливаем идентификатор группы потребителей сообщений
$consumerConfig->set('enable.partition.eof', 'true');

$consumer = new Consumer($consumerConfig);
$consumer->addBrokers(","); // указываем адреса для подключения к брокерам сообщений

$topicConfig = new TopicConf();
$topicConfig->set('', 100);
$topicConfig->set('', 'broker'); // механизм для хранения курсора
$topicConfig->set('auto.offset.reset', 'earliest'); // курсор по-умолчанию

$client = Client::initAsConsumer($consumer, $topicConfig);
$message = $client->getMessage(
        'partition' => 0,
        'offset' => 2,
        'timeout' => 2000
); // запрашиваем 1 сообщение из брокера
$message->getData(); // payload сообщения
$message->getOriginal(); // оригинальное сообщение RdKafka

* Перебираем новые сообщения из брокера
foreach ($client->getMessageIterator('my_topic') as $message) {
    echo $message->getData();

use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\TopicConf;
use KafkaClient\Client;

$producerConfig = new Conf();
$producerConfig->set('log_level', (string) LOG_DEBUG); // режим ведения логов
$producerConfig->set('debug', 'all');

$producer = new Producer($producerConfig);
$producer->addBrokers(","); // указываем адреса для подключения к брокерам сообщений

$client = Client::initAsProducer($producer);
    'Test message', 
        'partition' => 0, 
        'key' => 'example', 
        'headers' => ['One' => 'Two']