1. Go to this page and download the library: Download simpod/kafka library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
simpod / kafka example snippets
declare(strict_types=1);
namespace Your\AppNamespace;
use RdKafka\Message;
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
final class ExampleConsumer
{
public function run(): void
{
$kafkaConsumer = new KafkaConsumer($this->getConfig(), Logger::get());
$kafkaConsumer->subscribe(['topic1']);
$kafkaConsumer->start(
120 * 1000,
static function (Message $message) use ($kafkaConsumer) : void {
// Process message here
$kafkaConsumer->commit($message); // Autocommit is disabled
}
);
}
private function getConfig(): ConsumerConfig
{
$config = new ConsumerConfig();
$config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092');
$config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname());
$config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
$config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name');
return $config;
}
}
declare(strict_types=1);
namespace Your\AppNamespace;
use RdKafka\Message;
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
use SimPod\Kafka\Clients\Consumer\ConsumerRecords;
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
final class ExampleBatchConsumer
{
public function run(): void
{
$kafkaConsumer = new KafkaConsumer($this->getConfig());
$kafkaConsumer->subscribe(['topic1']);
$kafkaConsumer->startBatch(
200000,
120 * 1000,
static function (Message $message): void {
// Process record
},
static function (ConsumerRecords $consumerRecords) use ($kafkaConsumer) : void {
// Process records batch
$kafkaConsumer->commit($consumerRecords->getLast());
}
);
}
private function getConfig(): ConsumerConfig
{
$config = new ConsumerConfig();
$config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092');
$config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname());
$config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
$config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name');
return $config;
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.