PHP code example of imiphp / imi-kafka

1. Go to this page and download the library: Download imiphp/imi-kafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

imiphp / imi-kafka example snippets


[
    'components'    =>  [
        // 引入组件
        'Kafka'   =>  'Imi\Kafka',
    ],
]

[
    'pools'    =>    [
        'kafka'    => [
            'sync'    => [
                'pool'    => [
                    'class'        => \Imi\Kafka\Pool\KafkaSyncPool::class,
                    'config'       => [
                        'maxResources'    => 10,
                        'minResources'    => 0,
                    ],
                ],
                'resource'    => [
                    'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                    'groupId'          => 'test',
                    // 其它配置请参考:https://github.com/longyan/phpkafka/blob/master/doc/consumer.md#%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
                ],
            ],
            'async'    => [
                'pool'    => [
                    'class'        => \Imi\Kafka\Pool\KafkaCoroutinePool::class,
                    'config'       => [
                        'maxResources'    => 10,
                        'minResources'    => 1,
                    ],
                ],
                'resource'    => [
                    'bootstrapServers' => KAFKA_BOOTSTRAP_SERVERS,
                    'groupId'          => 'test',
                ],
            ],
        ],
    ]
]

[
    'beans' =>  [
        'Kafka'  =>  [
            'defaultPoolName'   =>  'kafka',
        ],
    ],
]

use Imi\Kafka\Pool\KafkaPool;
use longlang\phpkafka\Producer\ProduceMessage;

// 获取生产者对象
$producer = KafkaPool::getInstance();

// 发送
$producer->send('主题 Topic', '消息内容');
// send 方法定义
// public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null, ?int $brokerId = null): void

// 批量发送
$producer->sendBatch([
    new ProduceMessage($topic, 'v1', 'k1'),
    new ProduceMessage($topic, 'v2', 'k2'),
]);
// sendBatch 方法定义
// public function sendBatch(ProduceMessage[] $messages, ?int $brokerId = null): void



namespace ImiApp\Kafka\Test;

use Imi\Bean\Annotation\Bean;
use Imi\Kafka\Annotation\Consumer;
use Imi\Kafka\Base\BaseConsumer;
use Imi\Redis\Redis;
use longlang\phpkafka\Consumer\ConsumeMessage;

/**
 * @Bean("TestConsumer")
 * @Consumer(topic="queue-imi-1", groupId="test-consumer")
 */
class TestConsumer extends BaseConsumer
{
    /**
     * 消费任务
     */
    protected function consume(ConsumeMessage $message): void
    {
        $messageValue = $message->getValue();
    }
}



namespace ImiApp\Process;

use Imi\Aop\Annotation\Inject;
use Imi\App;
use Imi\Kafka\Contract\IConsumer;
use Imi\Process\Annotation\Process;
use Imi\Process\BaseProcess;

/**
 * @Process(name="TestProcess")
 */
class TestProcess extends BaseProcess
{
    /**
     * @Inject("TestConsumer")
     *
     * @var \ImiApp\Kafka\Test\TestConsumer
     */
    protected $testConsumer;

    public function run(\Swoole\Process $process): void
    {
        $this->runConsumer($this->testConsumer);
        \Swoole\Coroutine::yield();
    }

    private function runConsumer(IConsumer $consumer): void
    {
        go(function () use ($consumer) {
            try
            {
                $consumer->run();
            }
            catch (\Throwable $th)
            {
                /** @var \Imi\Log\ErrorLog $errorLog */
                $errorLog = App::getBean('ErrorLog');
                $errorLog->onException($th);
                sleep(3);
                $this->runConsumer($consumer);
            }
        });
    }
}

[
    'components'    =>  [
        'Kafka'  =>  'Imi\Kafka',
    ],
    'beans' =>  [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                // 加入队列消费进程,非必须,你也可以自己写进程消费
                'QueueConsumer',
            ],
        ],
        'imiQueue'  => [
            // 默认队列
            'default'   => 'QueueTest1',
            // 队列列表
            'list'  => [
                // 队列名称
                'QueueTest1' => [
                    // 使用的队列驱动
                    'driver'        => 'KafkaQueueDriver',
                    // 消费协程数量
                    'co'            => 1,
                    // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                    'process'       => 1,
                    // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                    'timespan'      => 0.1,
                    // 进程分组名称
                    'processGroup'  => 'a',
                    // 自动消费
                    'autoConsumer'  => true,
                    // 消费者类
                    'consumer'      => 'QueueTestConsumer',
                    // 驱动类所需要的参数数组
                    'config'        => [
                        // Kafka 连接池名称
                        'poolName' => 'kafka',
                        // 分组ID,如果不传或为null则使用连接池中的配置
                        'groupId'  => 'g1',
                    ],
                ],
            ],
        ],
    ]
]