PHP code example of anktx / kafka-client

1. Go to this page and download the library: Download anktx/kafka-client library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

anktx / kafka-client example snippets


use Anktx\Kafka\Client\Config\ProducerConfig;
use Anktx\Kafka\Client\KafkaProducer;
use Anktx\Kafka\Client\Message\KafkaProducerMessage;

$kafkaProducer = new KafkaProducer(
    new ProducerConfig(
        brokers: 'kafka:9092',
        /*  >>> the rest are optional <<<
        queueBufferingMaxKBytes: 2048,
        batchSize: 1024,
        lingerMs: 10,
        compressionType: CompressionType::snappy,
        isDebug: true,
        logger: new \Psr\Log\NullLogger(),
        */
    )
);

$kafkaProducer->produce(
    new KafkaProducerMessage(
        topic: 'topic',
        body: 'message body',
        /*  >>> the rest are optional <<<
        partition: 1,
        key: 'key',
        headers: ['name' => 'value'],
        */
    )
);

$kafkaProducer->flush();



use Anktx\Kafka\Client\Config\ConsumerConfig;
use Anktx\Kafka\Client\KafkaConsumer;
use Anktx\Kafka\Client\Subscription\Subscription;
use Anktx\Kafka\Client\Subscription\SubscriptionList;

$kafkaConsumer = new KafkaConsumer(
    new ConsumerConfig(
        brokers: 'kafka:9092',
        groupId: 'groupId',
        instanceId: '1',
        /*  >>> the rest are optional <<<
        offsetReset: OffsetReset::latest,
        autoCommitMs: 1000,
        sessionTimeoutMs: 10000,
        isDebug: true,
        logger: new \Psr\Log\NullLogger(),
        */
    )
);

$kafkaConsumer->subscribe(
    new SubscriptionList(
        new Subscription(topic: 'topic1'),
        // new Subscription(topic: 'topic2', partition: 1),
    ),
);

$messagesToConsume = 100;
$i = 0;

while (++$i < $messagesToConsume) {
    $message = $kafkaConsumer->consume();

    echo $message->body;
    print_r($message->headers);
    
    do_some_processing($message->body);

    $kafkaConsumer->commit($message);
}