1. Go to this page and download the library: Download arquivei/php-kafka-consumer library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
arquivei / php-kafka-consumer example snippets
afka\Consumer\ConsumerBuilder;
use Kafka\Consumer\Entities\Config\Sasl;
class DefaultConsumer
{
public function __invoke(string $message): void
{
print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
sleep(2);
print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
}
}
$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
->withSasl(new Sasl('username', 'pasword', 'mechanisms'))
->withCommitBatchSize(1)
->withSecurityProtocol('security-protocol')
->withHandler(new DefaultConsumer()) // or any callable
->build();
$consumer->consume();
afka\Consumer\Contracts\Consumer;
use Kafka\Consumer\Entities\Config;
use Kafka\Consumer\Entities\Config\Sasl;
class DefaultConsumer extends Consumer
{
public function handle(string $message): void
{
print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
sleep(2);
print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
}
}
$config = new Config(
new Sasl(
'username',
'password',
'mechanisms'
),
['topic'],
'broker:port',
1,
'php-kafka-consumer-group-id',
new DefaultConsumer(),
'PLAINTEXT',
'topic-dlq',
1,
6
);
$consumer = new \Kafka\Consumer\Consumer($config);
$consumer->consume();
use Kafka\Consumer\ConsumerBuilder;
$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
->withHandler(function ($message) {/** ... */})
// You may add any number of middlewares, they will be executed in the order provided
->withMiddleware(function (string $rawMessage, callable $next): void {
$decoded = json_decode($rawMessage, true);
$next($decoded);
})
->withMiddleware(function (array $message, callable $next): void {
if (! isset($message['foo'])) {
return;
}
$next($message);
})
->build();
$consumer->consume();