Download the PHP package limlabs/kafka-bundle without Composer
On this page you can find all versions of the php package limlabs/kafka-bundle. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Informations about the package kafka-bundle
A Symfony Kafka Bundle
- Installation
- Usage
- Configuration
- Sending messages to kafka
- Consuming kafka topics
- Implementing a consumer
- Executing a consumer
Installation
To install the symfony kafka bundle just execute the following composer command:
Usage
Configuration
To use the kafka bundle you have to configure at least the default
connection.
Example of such a configuration:
If you have multiple specific kafka connections, you can add multiple clients
to this configuration.
Sending messages to kafka
To send message to a topic over the default configuration you can simply use the factory to get the KafkaClient
.
From the KafkaClient
you can get the producer
for your kafka connection. With this producer you can create a topic
in which you can send a message with the produce
function.
The createTopic
automatically gets an existing topic or creates a new one, if the specified topic does not exist.
Here an example of this in a symfony controller:
:warning: Don't forget the
$producer->flush()
method call. Without it, you will lose data!
To get a KafkaClient
for a specific connection, just set the name of the desired connection as a parameter in the getKafkaClient
function of the factory.
Consuming kafka topics
Implementing a consumer
To consume kafka topics, you need to setup a class which implements the KafkaConsumer
-Interface.
This interface automatically tags the class correctly for symfony DI and forces you to implement the consume
and the getConsumerConfiguration
functions.
Here is a quick example of this:
In the getConsumerConfiguration
function you have to return a ConsumerConfiguration
-Object.
This describes the specific configuration for the consumer. The array keys consumer_group
and subscribed_topics
are required.
Without these, an exception will be thrown.
You can create the configuration object like in the previous example or like this:
The consume
function gets called on each message that comes in over the specified topics.
This function has to return a ConsumerResponse
.
The following responses are valid:
The SUCCESS
response signalises the executor that everything gone well. The ERROR_DROP
response
informs the executor of an error. In this case, an error message will be logged in the terminal, but the message will be dropped if you don't take care of it yourself!
In return of ERROR_DROP
, there is ERROR_REQUEUE
, which sends the payload of the message back to the Kafka topic from which it came to prevent data loss.
:warning: The use of
ERROR_REQUEUE
can lead to infinit loops between the consumer and Kafka.
Executing a consumer
To list all via symfony DI loaded consumers, you can use the following command:
Which would return something like this:
Now to start a consumer use the following command:
If you don't give the class path as an argument, the executor will automatically start the first loaded consumer. If the consumer started correctly you will be greeted by the following message:
:tada: Congrats now is your consumer consuming messages!
All versions of kafka-bundle with dependencies
ext-rdkafka Version *
ext-pcntl Version *