PHP code example of sts-gaming-group / kafka-bundle
1. Go to this page and download the library: Download sts-gaming-group/kafka-bundle library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
sts-gaming-group / kafka-bundle example snippets
declare(strict_types=1);
namespace App\Consumers;
use StsGamingGroup\KafkaBundle\Client\Consumer\Message;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Context;
class ExampleConsumer implements ConsumerInterface
{
public const CONSUMER_NAME = 'example_consumer';
public function consume(Message $message, Context $context): void
{
$data = $message->getData(); // contains denormalized data from Kafka
$retryNo = $context->getRetryNo(); // contains retry count in case of a failure
}
public function handleException(\Exception $exception, Context $context): void
{
// log it or i.e. produce to retry topic based on type of exception
}
public function getName(): string
{
return self::CONSUMER_NAME; // consumer unique name in your project
}
}
use StsGamingGroup\KafkaBundle\Client\Consumer\Exception\RecoverableMessageException;
declare(strict_types=1);
namespace App\Consumers;
use StsGamingGroup\KafkaBundle\Client\Traits\CommitOffsetTrait;
class ExampleConsumer implements ConsumerInterface
{
use CommitOffsetTrait;
public function consume(Message $message, Context $context): void
{
// process the message
$this->commitOffset($context); // manually commits the offset
}
}
namespace App\Decoder;
use StsGamingGroup\KafkaBundle\Configuration\ResolvedConfiguration;
use StsGamingGroup\KafkaBundle\Decoder\Contract\DecoderInterface;
class CustomDecoder implements DecoderInterface
{
public function decode(ResolvedConfiguration $configuration, string $message)
{
// $configuration contains values from sts_gaming_group_kafka.yaml or CLI
// $message contains raw value from Kafka
}
}
declare(strict_types=1);
namespace App\Normalizer;
use StsGamingGroup\KafkaBundle\Denormalizer\Contract\DenormalizerInterface;
class CustomDenormalizer implements DenormalizerInterface
{
public function denormalize($data): MessageDTO
{
// $data is an array which comes from AvroDecoder or some other registered Decoder
$messageDTO = new MessageDTO();
$messageDTO->setName($data['name']);
return $messageDTO;
}
}
...
class ExampleConsumer implements ConsumerInterface
{
public function consume(Message $message, Context $context): void
{
$messageDTO = $message->getData(); // $messageDTO comes from CustomDenormalizer
}
}
declare(strict_types=1);
namespace App\Validator;
use StsGamingGroup\KafkaBundle\Validator\Contract\ValidatorInterface;
use StsGamingGroup\KafkaBundle\Validator\Validator;
class ExampleValidator implements ValidatorInterface
{
public function validate($decoded): bool
{
return !array_key_exists('foo', $decoded);
}
public function failureReason($decoded): string
{
return sprintf('Missing foo key in decoded message.');
}
public function type() : string
{
return Validator::PRE_DENORMALIZE_TYPE; // runs before denormalization
// Validator::POST_DENORMALIZE_TYPE // runs after denormalization
}
}
...
use StsGamingGroup\KafkaBundle\Validator\Exception\ValidationException;
public function handleException(\Exception $exception, Context $context)
{
if ($exception instanceof ValidationException) {
$decoded = $exception->getData();
$this->logger->info(
sprintf(
'Message has not passed validation. Id: %s | Reason: %s',
$decoded['id'],
$exception->getFailedReason())
);
}
}
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent;
use StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent;
class ExampleConsumerEventSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents(): array
{
return [
PreMessageConsumedEvent::getEventName('example_consumer') => 'onPreMessageConsumed',
PostMessageConsumedEvent::getEventName('example_consumer') => 'onPostMessageConsumed',
PreMessageConsumedEvent::class => 'onGlobalPreMessageConsumed',
PostMessageConsumedEvent::class => 'onGlobalPostMessageConsumed'
];
}
public function onPreMessageConsumed(PreMessageConsumedEvent $event): void
{
$event->getConsumedMessages(); // number of processed messages
$event->getConsumptionTimeMs(); // how long consumer is running
}
public function onPostMessageConsumed(PostMessageConsumedEvent $event): void
{
$event->getConsumedMessages();
$event->getConsumptionTimeMs();
}
}
declare(strict_types=1);
namespace App\Consumers;
use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Callbacks;
class ExampleConsumer implements ConsumerInterface, CallableInterface
{
public function callbacks(): array
{
return [
Callbacks::OFFSET_COMMIT_CALLBACK => static function (
\RdKafka\KafkaConsumer $kafkaConsumer,
int $error,
array $partitions
) {
// call some action according to i.e. error
},
Callbacks::LOG_CALLBACK => static function ($kafka, int $level, string $facility, string $message) {
// log it somewhere
}
];
}
// other methods
}
declare(strict_types=1);
namespace App\Producers;
class SomeEntity
{
private int $id;
private string $name;
public function __construct(int $id, string $name)
{
$this->id = $id;
$this->name = $name;
}
public function toArray(): array
{
return [
'id' => $this->id,
'name' => $this->name
];
}
}
declare(strict_types=1);
namespace App\Producers;
use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;
class ExampleProducer implements ProducerInterface
{
public function produce($data): Message
{
/** @var SomeEntity $data */
return new Message(json_encode($data->toArray()), $data->getId());
// first argument of Message is the payload as a string
// second argument is a message key which is used to help kafka partition messages
}
public function supports($data): bool
{
// in case of many producers you should check what $data is passed here
return $data instanceof SomeEntity;
}
}
declare(strict_types=1);
namespace App\Command;
use StsGamingGroup\KafkaBundle\Client\Producer\ProducerClient;
class ExampleCommand extends Command
{
public function __construct(ProducerClient $client, SomeEntityRepository $repository)
{
$this->client = $client;
$this->repository = $repository;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$someEntities = $this->repository->findAll();
foreach ($someEntities as $entity) {
$this->client->produce($entity);
}
$this->client->flush(); // call flush after produce() method has finished
return Command::SUCCESS;
}
declare(strict_types=1);
namespace App\Producers;
use StsGamingGroup\KafkaBundle\Client\Contract\PartitionAwareProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;
class ExampleProducer implements ProducerInterface
{
public function produce($data): Message
{
/** @var SomeEntity $data */
return new Message(json_encode($data->toArray()), $data->getId());
}
public function getPartition($data, ResolvedConfiguration $configuration): int
{
/** @var SomeEntity $data */
return $data->getId() % 16; // calculating modulo from object id to produce to maximum of 16 partitions (0-15)
}
}
use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
class ExampleProducer implements ProducerInterface, CallableInterface
{
public function callbacks(): array
{
// callbacks array just like in Consumer example
}
}
declare(strict_types=1);
namespace App\Configuration;
use StsGamingGroup\KafkaBundle\Configuration\Contract\ConfigurationInterface;
use Symfony\Component\Console\Input\InputOption;
class Divisor implements ConfigurationInterface
{
public function getName(): string
{
return 'divisor';
}
public function getMode(): int
{
return InputOption::VALUE_REQUIRED;
}
public function getDescription(): string
{
return 'Option description';
}
public function isValueValid($value): bool
{
return is_numeric($value) && $value > 0;
}
public function getDefaultValue(): int
{
return 1;
}
}
class ExampleConsumer implements ConsumerInterface
{
public const CONSUMER_NAME = 'example_consumer';
public function consume(Message $message, Context $context): void
{
$divisor = $context->getValue(Divisor::NAME);
$remainder = $context->getValue(Remainder::NAME);
if ($message->getId() % $divisor !== $remainder) {
return; // let's skip that message
}
// process message normally
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.