PHP code example of adt / background-queue

1. Go to this page and download the library: Download adt/background-queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

adt / background-queue example snippets


$connection = [
	'serverVersion' => '8.0',
	'driver' => 'pdo_mysql',
	'host' => $_ENV['DB_HOST'],
	'port' => $_ENV['DB_PORT'],
	'user' => $_ENV['DB_USER'],
	'password' => $_ENV['DB_PASSWORD'],
	'dbname' => $_ENV['DB_DBNAME'],
];

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	'callbacks' => [
		'processEmail' => [$mailer, 'process'],
		'processEmail2' => [ // možnost specifikace jiné fronty pro tento callback
			'callback' => [$mailer, 'process'],
			'queue' => 'general',
		],
	]
	'notifyOnNumberOfAttempts' => 5, // počet pokusů o zpracování záznamu před zalogováním
	'tempDir' => $tempDir, // cesta pro uložení zámku proti vícenásobnému spuštění commandu
	'connection' => $connection, // pole parametru predavane do Doctrine\Dbal\Connection nebo DSN
	'queue' => $_ENV['PROJECT_NAME'], // název fronty, do které se ukládají a ze které se vybírají záznamy
	'bulkSize' => 1, // velikost dávky při vkládání více záznamů najednou
	'tableName' => 'background_job', // nepovinné, název tabulky, do které se budou ukládat jednotlivé joby
	'logger' => $logger, // nepovinné, musí implementovat psr/log LoggerInterface
	'onBeforeProcess' => function(array $parameters) {...}, // nepovinné
	'onError' => function(Throwable $e, array $parameters) {...},  // nepovinné
	'onAfterProcess' => function(array $parameters) {...}, // nepovinné
	'onProcessingGetMetadata' => function(array $parameters): ?array {...}, // nepovinné
	'parametersFormat' => \ADT\BackgroundQueue\Entity\BackgroundJob::PARAMETERS_FORMAT_SERIALIZE, // nepovinné, určuje v jakém formátu budou do DB ukládána data v `background_job.parameters` (@see \ADT\BackgroundQueue\Entity\BackgroundJob::setParameters)
]);

$connectionParams = [
    'host' => $_ENV['RABBITMQ_HOST'],
    'user' => $_ENV['RABBITMQ_USER'],
    'password' => $_ENV['RABBITMQ_PASSWORD']
];
$queueParams = [
    'arguments' => ['x-queue-type' => ['S', 'quorum']]
];

$manager = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager($connectionParams, $queueParams);
$producer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer();
$consumer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Consumer();

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'producer' => $producer,
	'waitingJobExpiration' => 1000, // nepovinné, délka v ms, po které se job pokusí znovu provést, když čeká na dokončení předchozího
]);

namespace App\Presenters;

use ADT\BackgroundQueue\BackgroundQueue;

class Mailer
{
    private BackgroundQueue $backgroundQueue

    public function __construct(BackgroundQueue $backgroundQueue)
    {
        $this->backgroundQueue = $backgroundQueue;
    }

	public function send(Invoice $invoice) 
	{
		$callbackName = 'processEmail';
		$parameters = [
			'to' => '[email protected]',
			'subject' => 'Background queue test'
			'text' => 'Anything you want.'
		];
		$serialGroup = 'invoice-' . $invoice->getId();
		$identifier = 'sendEmail-' . $invoice->getId();
		$isUnique = true; // always set to true if a callback on an entity should be performed only once, regardless of how it can happen that it is added to your queue twice
		$availableAt = new \DateTimeImmutable('+1 hour'); // earliest time when the record should be processed

		$this->backgroundQueue->publish($callbackName, $parameters, $serialGroup, $identifier, $isUnique, $availableAt);
	}
	
	public function process(string $to, string $subject, string $text) 
	{
	    // own implementation
	}
}

public function onError(\Throwable $exception) {

	// Příklad 1: Bude to neopakovatelná chyba a konzumer se před další iterací ukončí.
	if (!$this->entityManager->isOpen()) {
		throw new \ADT\BackgroundQueue\Exception\DieException('EM is closed.');
	}

	// Příklad 2: Bude se zpracovávat dle toho, co je v $exception, tedy pokud je $exception instanceof TemporaryErrorException, tak to bude opakovatelná chyba, ale konzumer se také před další iterací ukončí. Používá se například pri deadlocku.
	if (!$this->entityManager->isOpen()) {
		throw new \ADT\BackgroundQueue\DieException('EM is closed. Reason: ' . $exception->getMessage(), $exception->getCode(), $exception);
	}
}

$this->backgroundQueueService->startBulk();
foreach ($data as $oneJobData) {
    $this->backgroundQueue->publish(...);
}
$this->backgroundQueueService->endBulk();
json
{
  "conflict": {
    "php-amqplib/php-amqplib": "<3.0.0 || >=4.0.0"
  }
}
Dockerfile
docker-php-ext-install sockets

composer 

'notRealActual' => memory_get_usage(),
'realActual' => memory_get_usage(true),
'notRealPeak' => memory_get_peak_usage(),
'realPeak' => memory_get_peak_usage(true),

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
	...
]);

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
	'callbacks' => [
		'email' => [$mailer, 'process'], // záznamy budou mít prioritu 10
		'aclRecalculation' => [
			'callback' => [$aclService, 'process'],
			'priority' => 20,
		],
		'dataImporting' => [
			'callback' => [$apiService, 'process'],
			'priority' => 30,
		],
	],
	...
]);

$priority = null; // aplikuje se priorita 10 z nastavení pro callback
if ($isNewsletter) {
	$priority = 25;
}
$this->backgroundQueue->publish('email', $parameters, $serialGroup, $identifier, $isUnique, $availableAt, $priority);