1. Go to this page and download the library: Download adt/background-queue library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
adt / background-queue example snippets
$connection = [
'serverVersion' => '8.0',
'driver' => 'pdo_mysql',
'host' => $_ENV['DB_HOST'],
'port' => $_ENV['DB_PORT'],
'user' => $_ENV['DB_USER'],
'password' => $_ENV['DB_PASSWORD'],
'dbname' => $_ENV['DB_DBNAME'],
];
$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
'callbacks' => [
'processEmail' => [$mailer, 'process'],
'processEmail2' => [ // možnost specifikace jiné fronty pro tento callback
'callback' => [$mailer, 'process'],
'queue' => 'general',
],
]
'notifyOnNumberOfAttempts' => 5, // počet pokusů o zpracování záznamu před zalogováním
'tempDir' => $tempDir, // cesta pro uložení zámku proti vícenásobnému spuštění commandu
'connection' => $connection, // pole parametru predavane do Doctrine\Dbal\Connection nebo DSN
'queue' => $_ENV['PROJECT_NAME'], // název fronty, do které se ukládají a ze které se vybírají záznamy
'bulkSize' => 1, // velikost dávky při vkládání více záznamů najednou
'tableName' => 'background_job', // nepovinné, název tabulky, do které se budou ukládat jednotlivé joby
'logger' => $logger, // nepovinné, musí implementovat psr/log LoggerInterface
'onBeforeProcess' => function(array $parameters) {...}, // nepovinné
'onError' => function(Throwable $e, array $parameters) {...}, // nepovinné
'onAfterProcess' => function(array $parameters) {...}, // nepovinné
'onProcessingGetMetadata' => function(array $parameters): ?array {...}, // nepovinné
'parametersFormat' => \ADT\BackgroundQueue\Entity\BackgroundJob::PARAMETERS_FORMAT_SERIALIZE, // nepovinné, určuje v jakém formátu budou do DB ukládána data v `background_job.parameters` (@see \ADT\BackgroundQueue\Entity\BackgroundJob::setParameters)
]);
$connectionParams = [
'host' => $_ENV['RABBITMQ_HOST'],
'user' => $_ENV['RABBITMQ_USER'],
'password' => $_ENV['RABBITMQ_PASSWORD']
];
$queueParams = [
'arguments' => ['x-queue-type' => ['S', 'quorum']]
];
$manager = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager($connectionParams, $queueParams);
$producer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer();
$consumer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Consumer();
$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
...
'producer' => $producer,
'waitingJobExpiration' => 1000, // nepovinné, délka v ms, po které se job pokusí znovu provést, když čeká na dokončení předchozího
]);
namespace App\Presenters;
use ADT\BackgroundQueue\BackgroundQueue;
class Mailer
{
private BackgroundQueue $backgroundQueue
public function __construct(BackgroundQueue $backgroundQueue)
{
$this->backgroundQueue = $backgroundQueue;
}
public function send(Invoice $invoice)
{
$callbackName = 'processEmail';
$parameters = [
'to' => '[email protected]',
'subject' => 'Background queue test'
'text' => 'Anything you want.'
];
$serialGroup = 'invoice-' . $invoice->getId();
$identifier = 'sendEmail-' . $invoice->getId();
$isUnique = true; // always set to true if a callback on an entity should be performed only once, regardless of how it can happen that it is added to your queue twice
$availableAt = new \DateTimeImmutable('+1 hour'); // earliest time when the record should be processed
$this->backgroundQueue->publish($callbackName, $parameters, $serialGroup, $identifier, $isUnique, $availableAt);
}
public function process(string $to, string $subject, string $text)
{
// own implementation
}
}
public function onError(\Throwable $exception) {
// Příklad 1: Bude to neopakovatelná chyba a konzumer se před další iterací ukončí.
if (!$this->entityManager->isOpen()) {
throw new \ADT\BackgroundQueue\Exception\DieException('EM is closed.');
}
// Příklad 2: Bude se zpracovávat dle toho, co je v $exception, tedy pokud je $exception instanceof TemporaryErrorException, tak to bude opakovatelná chyba, ale konzumer se také před další iterací ukončí. Používá se například pri deadlocku.
if (!$this->entityManager->isOpen()) {
throw new \ADT\BackgroundQueue\DieException('EM is closed. Reason: ' . $exception->getMessage(), $exception->getCode(), $exception);
}
}
$this->backgroundQueueService->startBulk();
foreach ($data as $oneJobData) {
$this->backgroundQueue->publish(...);
}
$this->backgroundQueueService->endBulk();