1. Go to this page and download the library: Download kim1ne/kafka library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
kim1ne / kafka example snippets
use Kim1ne\InputMessage;
use Kim1ne\Kafka\KafkaConsumer;
use Kim1ne\Kafka\KafkaWorker;
use Kim1ne\Kafka\Message;
use RdKafka\Conf;
$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$conf->set('group.id', 'my-group');
// $conf->set(...) other settings
$worker = new KafkaWorker($conf);
$worker->subscribe(['my-topic'])
$worker
->on(function (Message $message, KafkaConsumer $consumer) {
$consumer->commitAsync($message);
})
->critical(function (\Throwable $throwable) {
InputMessage::red('Error: ' . $throwable->getMessage());
});
InputMessage::green('Start Worker');
$worker->run();
use Kim1ne\InputMessage;
use Kim1ne\Kafka\KafkaConsumer;
use Kim1ne\Kafka\Message;
/**
* @var \RdKafka\Conf $conf
*/
\Kim1ne\Kafka\ParallelWorkers::start(
(new \Kim1ne\Kafka\KafkaWorker($conf))
->subscribe(['topic-1'])
->on(function (Message $message, KafkaConsumer $consumer) {
InputMessage::red('Message in the first worker!')
}),
(new \Kim1ne\Kafka\KafkaWorker($conf))
->subscribe(['topic-2'])
->on(function (Message $message, KafkaConsumer $consumer) {
InputMessage::red('Message in the second worker!')
}),
// ... $workerN
);
use Kim1ne\Kafka\Message;
use Kim1ne\Kafka\KafkaConsumer;
$worker
->on(function(Message $message, KafkaConsumer $consumer) {
// Message!
});
use Kim1ne\Kafka\Message;
$worker
->error(function (Message $message) {
// the callback for bad message
// $message->err !== RD_KAFKA_RESP_ERR_NO_ERROR
// except messages with error code === RD_KAFKA_RESP_ERR__TIMED_OUT
});