PHP code example of daalvand / kafka
1. Go to this page and download the library: Download daalvand/kafka library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
daalvand / kafka example snippets
$app->register(Daalvand\Kafka\KafkaServiceProvider::class);
$app->configure("kafka");
use Daalvand\Kafka\Message\ProducerMessage;
use Daalvand\Kafka\Facades\Producer;
$producer = Producer::withAdditionalBroker('localhost:9092')
->build();
$message = (new ProducerMessage('topic-name', 0))
->withKey('test-key')
->withBody('some test message payload')
->withHeaders(['header' => 'value']);
$producer->produce($message);
$producer->flush(-1);
use Daalvand\Kafka\Facades\Consumer;
use Daalvand\Kafka\Exceptions\ConsumerConsumeException;
use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException;
use Daalvand\Kafka\Exceptions\ConsumerTimeoutException;
$consumer = Consumer::withAdditionalConfig([
'compression.codec' => 'lz4',
'auto.commit.interval.ms' => 500
])
->withAdditionalBroker('kafka:9092')
->withConsumerGroup('testGroup')
->withAdditionalSubscription('test-topic')
->build();
$consumer->subscribe();
while (true) {
try {
$message = $consumer->consume();
// your business logic
$consumer->commit($message);
} catch (ConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (ConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (ConsumerConsumeException $e) {
// Failed
}
}