1. Go to this page and download the library: Download anktx/kafka-client library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
anktx / kafka-client example snippets
use Anktx\Kafka\Client\Config\ProducerConfig;
use Anktx\Kafka\Client\KafkaProducer;
use Anktx\Kafka\Client\Message\KafkaProducerMessage;
$kafkaProducer = new KafkaProducer(
new ProducerConfig(
brokers: 'kafka:9092',
/* >>> the rest are optional <<<
queueBufferingMaxKBytes: 2048,
batchSize: 1024,
lingerMs: 10,
compressionType: CompressionType::snappy,
isDebug: true,
logger: new \Psr\Log\NullLogger(),
*/
)
);
$kafkaProducer->produce(
new KafkaProducerMessage(
topic: 'topic',
body: 'message body',
/* >>> the rest are optional <<<
partition: 1,
key: 'key',
headers: ['name' => 'value'],
*/
)
);
$kafkaProducer->flush();
use Anktx\Kafka\Client\Config\ConsumerConfig;
use Anktx\Kafka\Client\KafkaConsumer;
use Anktx\Kafka\Client\Subscription\Subscription;
use Anktx\Kafka\Client\Subscription\SubscriptionList;
$kafkaConsumer = new KafkaConsumer(
new ConsumerConfig(
brokers: 'kafka:9092',
groupId: 'groupId',
instanceId: '1',
/* >>> the rest are optional <<<
offsetReset: OffsetReset::latest,
autoCommitMs: 1000,
sessionTimeoutMs: 10000,
isDebug: true,
logger: new \Psr\Log\NullLogger(),
*/
)
);
$kafkaConsumer->subscribe(
new SubscriptionList(
new Subscription(topic: 'topic1'),
// new Subscription(topic: 'topic2', partition: 1),
),
);
$messagesToConsume = 100;
$i = 0;
while (++$i < $messagesToConsume) {
$message = $kafkaConsumer->consume();
echo $message->body;
print_r($message->headers);
do_some_processing($message->body);
$kafkaConsumer->commit($message);
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.