Download the PHP package sts-gaming-group/kafka-bundle without Composer

On this page you can find all versions of the php package sts-gaming-group/kafka-bundle. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.

FAQ

After the download, you have to make one include require_once('vendor/autoload.php');. After that you have to import the classes with use statements.

Example:
If you use only one package a project is not needed. But if you use more then one package, without a project it is not possible to import the classes with use statements.

In general, it is recommended to use always a project to download your libraries. In an application normally there is more than one library needed.
Some PHP packages are not free to download and because of that hosted in private repositories. In this case some credentials are needed to access such packages. Please use the auth.json textarea to insert credentials, if a package is coming from a private repository. You can look here for more information.

  • Some hosting areas are not accessible by a terminal or SSH. Then it is not possible to use Composer.
  • To use Composer is sometimes complicated. Especially for beginners.
  • Composer needs much resources. Sometimes they are not available on a simple webspace.
  • If you are using private repositories you don't need to share your credentials. You can set up everything on our site and then you provide a simple download link to your team member.
  • Simplify your Composer build process. Use our own command line tool to download the vendor folder as binary. This makes your build process faster and you don't need to expose your credentials for private repositories.
Please rate this library. Is it a good library?

Informations about the package kafka-bundle

kafka-bundle

Technology stack

Quick start

If you wish to install it in your Symfony project:

Example project

If you want to test out capabilities of this bundle in a Symfony project, please refer to https://github.com/sts-gaming-group/kafka-bundle-app project which ships with kafka-bundle and docker-compose file for convenience.

Basic Configuration

  1. Add sts_gaming_group_kafka.yaml to config folder at config/packages/sts_gaming_group_kafka.yaml or in a specific env folder i.e. config/packages/prod/sts_gaming_group_kafka.yaml
  2. Add configuration to sts_gaming_group_kafka.yaml for example:

  3. Most of the time you would like to keep your kafka configuration in the yaml file, but you can also pass configuration directly in CLI for example:

Currently, options passed in CLI only work for consumers which are run by command kafka:consumers:consume.

The configurations are resolved in runtime. The priority is as follows:

Consuming messages

  1. Create consumer

    1. If configuration was done properly (proper broker, topic and most likely schema registry), you should be able to run your consumer and receive messages

Retrying failed messages

To trigger a backoff retry, your consumer should throw RecoverableMessageException in consume method. Also, you have to configure few retry options in sts_gaming_group_kafka.yaml

With such configuration you will receive the same message 4 times maximum (first consumption + 3 retries). Before the first retry, there will be 300 ms delay. Before the second retry, there will be 900 ms delay (retry_delay * retry_multiplier). Before the third retry, there will be 2500 ms delay (max_retry_delay). It is important to remember about committing offsets in Kafka in case of a permanently failed message (in case enable_auto_commit is set to false).

Any uncaught exception in your consumer will shut down the consumer.

Handling offsets

By default, option enable.auto.commit is set true. In such cases after consuming a message, offsets will be committed automatically to Kafka brokers. Frequency in which offsets are committed is described by option auto.commit.interval.ms (defaults to 50ms). It means that every 50ms Librdkafka (library that manages Kafka underneath PHP process) will send currently stored offset to Kafka Broker. It also means, that if you consume a message, and your PHP process dies after 49ms the message will not be committed and after restarting the consumer you will receive the same message again. Such a situation is very unlikely but may happen.

Kafka guarantess at-least-once-delivery per message - per topic - per consumer group.id. One of implications of such behavior is that if offset is not committed to Broker, Kafka will resend you the same message again. It is up to developer to handle such cases.

One approach to be 100% sure about offsets commit is to handle them manually by setting enable.auto.commit to false. You can then use CommitOffsetTrait::commitOffset() method to send current offset to Broker.

Manually committing offsets gives you almost 100% confidence that you will not receive the same message again. There is, however, still small chance that offset will not be saved to Broker for example when there is a network issue. Again, it is up to a developer to handle such cases (probably with a try...catch block while committing offsets).

There is, however, one big downside to manual commits - they are slow. The reason is that commits have to be done inside your PHP process and therefore it blocks your main thread. Each commit may last i.e. about 40-50 ms which in case of Kafka is incredibly slow. You can pass true as a second argument to $this->commitOffset($context, true); Manual commits will then be handled asynchronously and will be much faster - but again in case your PHP process dies while committing, some offsets may not be send to Broker (almost the same story when enable.auto.commit is set to true and your process dies).

Looking at above situations it is rather recommended to keep enable.auto.commit option set to true and handle possible duplicated messages inside your application.

Decoders

Decoders are meant to turn raw Kafka data (json, avro, plain text or anything else) into PHP array (or actually any format you'd like). There are three decoders available:

By default, this package uses AvroDecoder and requires a schema_registry configuration. Schema registry should be a URL containing schema versions of consumed messages.

You can also implement your own decoder by implementing DecoderInterface

Register it in your configuration

Denormalizers

You may also want to denormalize the message into some kind of DTO or any other object you wish. By default, this bundle does not denormalize the message into any object and passes you an array (which comes from the AvroDecoder).

Your denormalizers must implement DenormalizerInterface and requires you to implement denormalize method. Return value may be of any kind.

Register it in your configuration:

Receive it in your consumer:

Validators

After of before denormalization, you may want to validate if given object should be passed to your consumer - you may want, for example, to filter out incomplete data that came from Broker.

  1. Create validator

Register it in your configuration:

You may have multiple validators attached to one consumer. The priority of called validators is exactly how you defined them in sts_gaming_group_kafka.yaml - so in this case ExampleValidator is called first, and then SomeOtherValidator is called.

If a validator returns false, an instance of ValidatorException is thrown.

Offset for a message which has not passed validation is committed automatically.

Events

Consumer dispatches events using symfony/event-dispatcher component as an optional dependency:

Only for currently running consumer:

Global event for all consumers:

As the name suggests - first event is dispatched before the message is consumed, and the second event is dispatched just after the message has been consumed (retry mechanism is not taken into account, message needs to be processed fully for the event to be dispatched). You can hook up into these events using symfony event subscriber/listener i.e.

Kafka Callbacks

Librdkafka (C/C++ library used underneath PHP) provides several callbacks that you can use in different situations (consuming/producing/error handling/logging). Your consumer must implement CallableInterface which requires you to define callbacks method. This method should return an array of callbacks you wish to handle yourself.

Producing Messages

  1. To produce messages you must configure few options in sts_gaming_group_kafka.yaml:

  2. Create data object which you want to work on (i.e. some entity or DTO)

  3. Create a producer which will work on your data object and create Message for Kafka

  4. Push message by calling ProducerClient::produce() i.e. somewhere in your Command class

  5. To produce message to a specific partition your Producer can implement PartitionAwareProducerInterface

  6. You can also set callbacks array to Producer, for example, to check if messages were sent successfully. Your producer class should implement CallableInterface.

  7. Other options that can be configured for ProducerClient at runtime:

Custom configurations

Some times you may wish to pass some additional options to your Consumer object. You may add your own configuration:

Custom option may only be passed in CLI

You will receive it in consume method, and you may take actions accordingly.

Example above shows how you could scale up your application by executing i.e. 4 consumers/commands with different remainders and group ids. You may have to resort to such tactics if your topic has only one partition and there is no way to scale up your consumer.

Showing current consumer/producer configuration

You can show current configuration that will be passed to consumer by calling following command

You can show producers configuration by running

License

This package is distributed under MIT license. Please refer to LICENSE.md for more details.


All versions of kafka-bundle with dependencies

PHP Build Version
Package Version
Requires php Version >=7.4
ext-rdkafka Version *
ext-json Version *
symfony/config Version ^4.3.3|^5.0|^6.0
symfony/dependency-injection Version ^4.3.3|^5.2|^6.0
symfony/http-kernel Version ^4.3|^5.2.1|^6.0
symfony/console Version ^4.0|^5.2.0|^6.0
symfony/options-resolver Version ~4.3|^5.2|^6.0
flix-tech/avro-serde-php Version ^2.0
Composer command for our command line client (download client) This client runs in each environment. You don't need a specific PHP version etc. The first 20 API calls are free. Standard composer command

The package sts-gaming-group/kafka-bundle contains the following files

Loading the files please wait ....