PHP code example of imiphp / imi-kafka
1. Go to this page and download the library: Download imiphp/imi-kafka library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
imiphp / imi-kafka example snippets
[
'components' => [
// 引入组件
'Kafka' => 'Imi\Kafka',
],
]
[
'pools' => [
'kafka' => [
'sync' => [
'pool' => [
'class' => \Imi\Kafka\Pool\KafkaSyncPool::class,
'config' => [
'maxResources' => 10,
'minResources' => 0,
],
],
'resource' => [
'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
'groupId' => 'test',
// 其它配置请参考:https://github.com/longyan/phpkafka/blob/master/doc/consumer.md#%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
],
],
'async' => [
'pool' => [
'class' => \Imi\Kafka\Pool\KafkaCoroutinePool::class,
'config' => [
'maxResources' => 10,
'minResources' => 1,
],
],
'resource' => [
'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
'groupId' => 'test',
],
],
],
]
]
[
'beans' => [
'Kafka' => [
'defaultPoolName' => 'kafka',
],
],
]
use Imi\Kafka\Pool\KafkaPool;
use longlang\phpkafka\Producer\ProduceMessage;
// 获取生产者对象
$producer = KafkaPool::getInstance();
// 发送
$producer->send('主题 Topic', '消息内容');
// send 方法定义
// public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null, ?int $brokerId = null): void
// 批量发送
$producer->sendBatch([
new ProduceMessage($topic, 'v1', 'k1'),
new ProduceMessage($topic, 'v2', 'k2'),
]);
// sendBatch 方法定义
// public function sendBatch(ProduceMessage[] $messages, ?int $brokerId = null): void
namespace ImiApp\Kafka\Test;
use Imi\Bean\Annotation\Bean;
use Imi\Kafka\Annotation\Consumer;
use Imi\Kafka\Base\BaseConsumer;
use Imi\Redis\Redis;
use longlang\phpkafka\Consumer\ConsumeMessage;
/**
* @Bean("TestConsumer")
* @Consumer(topic="queue-imi-1", groupId="test-consumer")
*/
class TestConsumer extends BaseConsumer
{
/**
* 消费任务
*/
protected function consume(ConsumeMessage $message): void
{
$messageValue = $message->getValue();
}
}
namespace ImiApp\Process;
use Imi\Aop\Annotation\Inject;
use Imi\App;
use Imi\Kafka\Contract\IConsumer;
use Imi\Process\Annotation\Process;
use Imi\Process\BaseProcess;
/**
* @Process(name="TestProcess")
*/
class TestProcess extends BaseProcess
{
/**
* @Inject("TestConsumer")
*
* @var \ImiApp\Kafka\Test\TestConsumer
*/
protected $testConsumer;
public function run(\Swoole\Process $process): void
{
$this->runConsumer($this->testConsumer);
\Swoole\Coroutine::yield();
}
private function runConsumer(IConsumer $consumer): void
{
go(function () use ($consumer) {
try
{
$consumer->run();
}
catch (\Throwable $th)
{
/** @var \Imi\Log\ErrorLog $errorLog */
$errorLog = App::getBean('ErrorLog');
$errorLog->onException($th);
sleep(3);
$this->runConsumer($consumer);
}
});
}
}
[
'components' => [
'Kafka' => 'Imi\Kafka',
],
'beans' => [
'AutoRunProcessManager' => [
'processes' => [
// 加入队列消费进程,非必须,你也可以自己写进程消费
'QueueConsumer',
],
],
'imiQueue' => [
// 默认队列
'default' => 'QueueTest1',
// 队列列表
'list' => [
// 队列名称
'QueueTest1' => [
// 使用的队列驱动
'driver' => 'KafkaQueueDriver',
// 消费协程数量
'co' => 1,
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
'process' => 1,
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
'timespan' => 0.1,
// 进程分组名称
'processGroup' => 'a',
// 自动消费
'autoConsumer' => true,
// 消费者类
'consumer' => 'QueueTestConsumer',
// 驱动类所需要的参数数组
'config' => [
// Kafka 连接池名称
'poolName' => 'kafka',
// 分组ID,如果不传或为null则使用连接池中的配置
'groupId' => 'g1',
],
],
],
],
]
]