PHP code example of micromus / kafka-bus

1. Go to this page and download the library: Download micromus/kafka-bus library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

micromus / kafka-bus example snippets




use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Connections\Config\KafkaConnectionConfig;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandler;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRoutes;
use Micromus\KafkaBus\Consumers\Router\Route as ConsumerRoute;
use Micromus\KafkaBus\Producers\Messages\ProducerMessage;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;

icRegistry->get('products'), new PrintHandler())),
    new Bus\Listeners\Workers\Options(additionalOptions: $consumeOptions)
);

$workerRegistry = (new Bus\Listeners\Workers\WorkerRegistry())
    ->add($worker);

// Configure how to route producer messages to topics
$routes = (new Bus\Publishers\Router\PublisherRoutes())
    ->add(new Bus\Publishers\Router\Route(ProducerMessage::class, $topicRegistry->get('products')));

// Kafka connection(s)
$connectionRegistry = new ConnectionRegistry(
    new DriverRegistry(),
    ['default' => new KafkaConnectionConfig('127.0.0.1:29092')]
);

// Create Bus
$publisherFactory = new Bus\Publishers\PublisherFactory(
    new ProducerStreamFactory(),
    $routes
);

$listenerFactory = new Bus\Listeners\ListenerFactory(
    new ConsumerStreamFactory(new MessageHandlerFactory()),
    $workerRegistry
);

$bus = new Bus(
    new Bus\ThreadRegistry(
        $connectionRegistry,
        new Bus\ThreadFactory($listenerFactory, $publisherFactory)
    ),
    'default' // default connection name
);

// Produce a message
$bus->publish(new ProducerMessage(payload: 'test-message', headers: ['foo' => 'bar']));

// Consume in the same process (or run it separately)
pcntl_async_signals(true);
$listener = $bus->listener('default-listener');
pcntl_signal(SIGINT, fn () => $listener->forceStop());
$listener->listen();