PHP code example of daalvand / kafka

1. Go to this page and download the library: Download daalvand/kafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

daalvand / kafka example snippets



 $app->register(Daalvand\Kafka\KafkaServiceProvider::class);



$app->configure("kafka");


use Daalvand\Kafka\Message\ProducerMessage;
use Daalvand\Kafka\Facades\Producer;

$producer = Producer::withAdditionalBroker('localhost:9092')
    ->build();

$message = (new ProducerMessage('topic-name', 0))
            ->withKey('test-key')
            ->withBody('some test message payload')
            ->withHeaders(['header' => 'value']);

$producer->produce($message);
$producer->flush(-1);



use Daalvand\Kafka\Facades\Consumer;
use Daalvand\Kafka\Exceptions\ConsumerConsumeException;
use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException;
use Daalvand\Kafka\Exceptions\ConsumerTimeoutException;

$consumer = Consumer::withAdditionalConfig([
            'compression.codec'       => 'lz4',
            'auto.commit.interval.ms' => 500
    ])
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (ConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (ConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerConsumeException $e) {
        // Failed
    }
}