PHP code example of roslov / queue-bundle

1. Go to this page and download the library: Download roslov/queue-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

roslov / queue-bundle example snippets




declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Roslov\QueueBundle\Serializer\MessagePayloadSerializer;

final class UserCreatedConsumer implements ConsumerInterface
{
    public function __construct(private MessagePayloadSerializer $serializer)
    {
    }

    public function execute(AMQPMessage $msg): int
    {
        // Restore connections to DB if needed...
        // Refresh entity manager if used (`$this->em->clear()`)...

        $dto = $this->serializer->deserialize($msg->getBody());
        // `$dto` will be automatically detected based on payload type.

        // Process DTO...

        return ConsumerInterface::MSG_ACK;
    }
}



declare(strict_types=1);

namespace App\Producer;

use Roslov\QueueBundle\Producer\BaseProducer;

final class UserCreatedProducer extends BaseProducer
{
    protected function getRoutingKey(): string
    {
        return 'user-created';
    }
}



declare(strict_types=1);

namespace App\Producer;

use App\Dto\Queue\UserCreated;
use Roslov\QueueBundle\Processor\EventProcessor;
use Roslov\QueueBundle\Producer\BaseProducerFacade;

/**
 * Keeps all calls to producers.
 */
final class ProducerFacade extends BaseProducerFacade
{
    public function __construct(
        EventProcessor $eventProcessor,
        // Inject other services
    ) {
        parent::__construct($eventProcessor);
    }

    public function sendUserCreatedEvent(int $userId): void
    {
        $payload = new UserCreated();
        $payload->setId($userId);
        $this->send('user_created', $payload);
    }
}

$this->em->getConnection()->beginTransaction();
try {
    // Your code...
    $producerFacade->sendUserCreatedEvent(123); // Creating an event — the event will be stored in memory.
                                                // We cannot store it in DB right now because this code may be used in
                                                // Doctrine lifetime cycles.
    // Your code...
    $this->eventProcessor->flush(); // All events are being stored in DB.
                                    // This should be done right before committing. Otherwise, you may lose your events.
                                    // All events will be sent to RabbitMQ on kernel terminate or on message consume.
    $this->em->getConnection()->commit();
} catch (Throwable $e) {
    $this->em->getConnection()->rollBack();
    throw $e;
}



declare(strict_types=1);

namespace App\Queue;

use App\Dto\Queue\GetUserCommand;
use App\Dto\Queue\User;
use Psr\Log\LoggerInterface;
use Roslov\QueueBundle\Dto\Error;
use Roslov\QueueBundle\Exception\UnknownErrorException;
use Roslov\QueueBundle\Rpc\ClientInterface;

final class UserProvider
{
    private const EXCHANGE_NAME = 'rpc.main';

    private const USER_NOT_FOUND = 'UserNotFound';

    public function __construct(private ClientInterface $client, private LoggerInterface $logger)
    {
    }

    public function getUser(int $id): ?User
    {
        $command = new GetUserCommand();
        $command->setId($id);

        /** @var User|Error $user */
        $user = $this->client->call($command, self::EXCHANGE_NAME);

        if ($user instanceof User) {
            $this->logger->info("The details for the user with id \"$id\" have been received.");
            return $user;
        }
        if ($user instanceof Error && $user->getType() === self::USER_NOT_FOUND) {
            $this->logger->info("The user with id \"$id\" does not exist on the remote server.");
            return null;
        }
        throw new UnknownErrorException('Unknown error happened.');
    }
}



declare(strict_types=1);

namespace App\Rpc;

use App\Dto\Queue\GetUserCommand;
use InvalidArgumentException;
use Roslov\QueueBundle\Dto\Error;
use Roslov\QueueBundle\Rpc\HandlerInterface;

final class UserHandler implements HandlerInterface
{
    private const USER_NOT_FOUND = 'UserNotFound';

    public function handle(object $command): object
    {
        if (!$command instanceof GetUserCommand) {
            throw new InvalidArgumentException(sprintf(
                'Command "%s" is not supported. The handler supports "%s" only.',
                $command::class,
                GetUserCommand::class
            ));
        }

        // Search for a user
        $user = $this->findUser($command->getId()); // Your code for getting a user

        if ($user === null) {
            $error = new Error();
            $error->setType(self::USER_NOT_FOUND);
            $error->setMessage('User not found.');
            return $error;
        }
        return $user;
    }
}


final class ExceptionValidator
{
    /**
     * Returns `true` if notification about exception SHOULD BE sent.
     *
     * In this case, we notify about all exceptions except `UserNotFoundException`.
     *
     * @param \Throwable $exception The exception that must be validated
     * @return bool Validation result
     */
    public function __invoke(\Throwable $exception): bool
    {
        return !$exception instanceof \App\Exception\UserNotFoundException;
    }
}