1. Go to this page and download the library: Download micromus/kafka-bus library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
micromus / kafka-bus example snippets
use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Connections\Config\KafkaConnectionConfig;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandler;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRoutes;
use Micromus\KafkaBus\Consumers\Router\Route as ConsumerRoute;
use Micromus\KafkaBus\Producers\Messages\ProducerMessage;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;
icRegistry->get('products'), new PrintHandler())),
new Bus\Listeners\Workers\Options(additionalOptions: $consumeOptions)
);
$workerRegistry = (new Bus\Listeners\Workers\WorkerRegistry())
->add($worker);
// Configure how to route producer messages to topics
$routes = (new Bus\Publishers\Router\PublisherRoutes())
->add(new Bus\Publishers\Router\Route(ProducerMessage::class, $topicRegistry->get('products')));
// Kafka connection(s)
$connectionRegistry = new ConnectionRegistry(
new DriverRegistry(),
['default' => new KafkaConnectionConfig('127.0.0.1:29092')]
);
// Create Bus
$publisherFactory = new Bus\Publishers\PublisherFactory(
new ProducerStreamFactory(),
$routes
);
$listenerFactory = new Bus\Listeners\ListenerFactory(
new ConsumerStreamFactory(new MessageHandlerFactory()),
$workerRegistry
);
$bus = new Bus(
new Bus\ThreadRegistry(
$connectionRegistry,
new Bus\ThreadFactory($listenerFactory, $publisherFactory)
),
'default' // default connection name
);
// Produce a message
$bus->publish(new ProducerMessage(payload: 'test-message', headers: ['foo' => 'bar']));
// Consume in the same process (or run it separately)
pcntl_async_signals(true);
$listener = $bus->listener('default-listener');
pcntl_signal(SIGINT, fn () => $listener->forceStop());
$listener->listen();
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.