PHP code example of sts-gaming-group / kafka-bundle

1. Go to this page and download the library: Download sts-gaming-group/kafka-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

sts-gaming-group / kafka-bundle example snippets




declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Consumer\Message;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Context;

class ExampleConsumer implements ConsumerInterface
{
    public const CONSUMER_NAME = 'example_consumer';

    public function consume(Message $message, Context $context): void
    {
        $data = $message->getData(); // contains denormalized data from Kafka
        $retryNo = $context->getRetryNo();  // contains retry count in case of a failure
    }

    public function handleException(\Exception $exception, Context $context): void
    {
        // log it or i.e. produce to retry topic based on type of exception
    }

    public function getName(): string
    {
        return self::CONSUMER_NAME; // consumer unique name in your project
    }
 }
 

use StsGamingGroup\KafkaBundle\Client\Consumer\Exception\RecoverableMessageException;
 


declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Traits\CommitOffsetTrait;

class ExampleConsumer implements ConsumerInterface
{
   use CommitOffsetTrait;
   
   public function consume(Message $message, Context $context): void
   {
      // process the message
      $this->commitOffset($context); // manually commits the offset
   }
}



namespace App\Decoder;

use StsGamingGroup\KafkaBundle\Configuration\ResolvedConfiguration;
use StsGamingGroup\KafkaBundle\Decoder\Contract\DecoderInterface;

class CustomDecoder implements DecoderInterface
{
    public function decode(ResolvedConfiguration $configuration, string $message)
    {
        // $configuration contains values from sts_gaming_group_kafka.yaml or CLI
        // $message contains raw value from Kafka
    }
}



declare(strict_types=1);

namespace App\Normalizer;

use StsGamingGroup\KafkaBundle\Denormalizer\Contract\DenormalizerInterface;

class CustomDenormalizer implements DenormalizerInterface
{
    public function denormalize($data): MessageDTO
    {
        // $data is an array which comes from AvroDecoder or some other registered Decoder
        $messageDTO = new MessageDTO();
        $messageDTO->setName($data['name']);

        return $messageDTO;
    }
}



...

class ExampleConsumer implements ConsumerInterface
{
    public function consume(Message $message, Context $context): void
    {
        $messageDTO = $message->getData(); // $messageDTO comes from CustomDenormalizer
    }
}



declare(strict_types=1);

namespace App\Validator;

use StsGamingGroup\KafkaBundle\Validator\Contract\ValidatorInterface;
use StsGamingGroup\KafkaBundle\Validator\Validator;

class ExampleValidator implements ValidatorInterface
{
    public function validate($decoded): bool
    {
        return !array_key_exists('foo', $decoded);
    }

    public function failureReason($decoded): string
    {
        return sprintf('Missing foo key in decoded message.');
    }
    
    public function type() : string
    {
       return Validator::PRE_DENORMALIZE_TYPE; // runs before denormalization
       // Validator::POST_DENORMALIZE_TYPE // runs after denormalization
    }
}

 ...
 
 use StsGamingGroup\KafkaBundle\Validator\Exception\ValidationException;
 
 public function handleException(\Exception $exception, Context $context)
 {
     if ($exception instanceof ValidationException) {      
         $decoded = $exception->getData();
         $this->logger->info(
             sprintf(
                 'Message has not passed validation. Id: %s  | Reason: %s', 
                 $decoded['id'],
                 $exception->getFailedReason())
         );
     }
 }

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent;
use StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent;

class ExampleConsumerEventSubscriber implements EventSubscriberInterface
{
    public static function getSubscribedEvents(): array
    {
        return [
            PreMessageConsumedEvent::getEventName('example_consumer') => 'onPreMessageConsumed',
            PostMessageConsumedEvent::getEventName('example_consumer') => 'onPostMessageConsumed',
            PreMessageConsumedEvent::class => 'onGlobalPreMessageConsumed',
            PostMessageConsumedEvent::class => 'onGlobalPostMessageConsumed'
        ];
    }

    public function onPreMessageConsumed(PreMessageConsumedEvent $event): void
    {
        $event->getConsumedMessages(); // number of processed messages
        $event->getConsumptionTimeMs(); // how long consumer is running
    }

    public function onPostMessageConsumed(PostMessageConsumedEvent $event): void
    {
        $event->getConsumedMessages();
        $event->getConsumptionTimeMs();
    }
}



declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Callbacks;

class ExampleConsumer implements ConsumerInterface, CallableInterface
{
    public function callbacks(): array
    {
        return [
            Callbacks::OFFSET_COMMIT_CALLBACK => static function (
                \RdKafka\KafkaConsumer $kafkaConsumer,
                int $error,
                array $partitions
            ) {
                // call some action according to i.e. error
            },
            Callbacks::LOG_CALLBACK => static function ($kafka, int $level, string $facility, string $message) {
                // log it somewhere
            }
        ];
    }
    
    // other methods
}
 
 


declare(strict_types=1);

namespace App\Producers;

class SomeEntity
{
    private int $id;
    private string $name;

    public function __construct(int $id, string $name)
    {
        $this->id = $id;
        $this->name = $name;
    }

    public function toArray(): array
    {
        return [
            'id' => $this->id,
            'name' => $this->name
        ];
    }
}



declare(strict_types=1);

namespace App\Producers;

use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;

class ExampleProducer implements ProducerInterface
{
    public function produce($data): Message
    {
        /** @var SomeEntity $data */
        return new Message(json_encode($data->toArray()), $data->getId());
        // first argument of Message is the payload as a string
        // second argument is a message key which is used to help kafka partition messages
    }

    public function supports($data): bool
    {
        // in case of many producers you should check what $data is passed here
        return $data instanceof SomeEntity;
    }
}



declare(strict_types=1);

namespace App\Command;

use StsGamingGroup\KafkaBundle\Client\Producer\ProducerClient;

class ExampleCommand extends Command
{ 
 public function __construct(ProducerClient $client, SomeEntityRepository $repository)
 {
     $this->client = $client;
     $this->repository = $repository;
 }
 
 protected function execute(InputInterface $input, OutputInterface $output): int
 {
     $someEntities = $this->repository->findAll();
     foreach ($someEntities as $entity) {
         $this->client->produce($entity);
     }

     $this->client->flush(); // call flush after produce() method has finished

     return Command::SUCCESS;
 }



declare(strict_types=1);

namespace App\Producers;

use StsGamingGroup\KafkaBundle\Client\Contract\PartitionAwareProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;

class ExampleProducer implements ProducerInterface
{
    public function produce($data): Message
    {
        /** @var SomeEntity $data */
        return new Message(json_encode($data->toArray()), $data->getId());
    }

    public function getPartition($data, ResolvedConfiguration $configuration): int
    {
        /** @var SomeEntity $data */
        return $data->getId() % 16; // calculating modulo from object id to produce to maximum of 16 partitions (0-15)
    }
}

use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;

class ExampleProducer implements ProducerInterface, CallableInterface
{
    public function callbacks(): array
    {
        // callbacks array just like in Consumer example
    }
}
 
$this->producerClient
   ->setPollingBatch(25000)   
   ->setPollingTimeoutMs(1000)
   ->setFlushTimeoutMs(500)
   ->setMaxFlushRetries(10);



declare(strict_types=1);

namespace App\Configuration;

use StsGamingGroup\KafkaBundle\Configuration\Contract\ConfigurationInterface;
use Symfony\Component\Console\Input\InputOption;

class Divisor implements ConfigurationInterface
{
    public function getName(): string
    {
        return 'divisor';
    }

    public function getMode(): int
    {
        return InputOption::VALUE_REQUIRED;
    }

    public function getDescription(): string
    {
        return 'Option description';
    }

    public function isValueValid($value): bool
    {
        return is_numeric($value) && $value > 0;
    }

    public function getDefaultValue(): int
    {
        return 1;
    }
}

class ExampleConsumer implements ConsumerInterface
{
    public const CONSUMER_NAME = 'example_consumer';

    public function consume(Message $message, Context $context): void
    {
        $divisor = $context->getValue(Divisor::NAME);
        $remainder = $context->getValue(Remainder::NAME);
        
        if ($message->getId() % $divisor !== $remainder) {
            return; // let's skip that message
        }
        
        // process message normally
    }
}