1. Go to this page and download the library: Download tobento/service-queue library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
tobento / service-queue example snippets
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;
$job = new Job(
name: 'sample',
payload: ['key' => 'value'],
);
var_dump($job instanceof JobInterface);
// bool(true)
use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;
final class Mail implements JobHandlerInterface
{
public function __construct(
private MailerInterface $mailer,
private MessageFacotryInterface $messageFactory,
) {}
public function handleJob(JobInterface $job): void
{
$message = $this->messageFactory->createFromArray($job->getPayload());
$this->mailer->send($message);
}
public static function toPayload(MessageInterface $message): array
{
return $message->jsonSerialize();
}
}
use Tobento\Service\Queue\Job;
$job = new Job(
name: Mail::class,
payload: Mail::toPayload($message),
);
use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\JobInterface;
final class MailJob extends CallableJob
{
public function __construct(
private null|MessageInterface $message = null,
) {}
public function handleJob(
JobInterface $job,
MailerInterface $mailer,
MessageFacotryInterface $messageFactory,
): void {
$message = $messageFactory->createFromArray($job->getPayload());
$mailer->send($message);
}
public function getPayload(): array
{
if (is_null($this->message)) {
return []; // or throw exception
}
return $this->message->jsonSerialize();
}
public function renderTemplate(): static
{
// render template logic ...
return $this;
}
}
$job = (new MailJob($message))
->renderTemplate();
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Duration(seconds: 10))
->parameter(new Parameter\Retry(max: 2));
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;
$job = (new Job(name: 'sample'))
->queue(name: 'secondary')
->data(['key' => 'value'])
->duration(seconds: 10)
->retry(max: 2)
->delay(seconds: 5)
->unique()
->priority(100)
->pushing(function() {})
->encrypt();
use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\Parameter;
final class SampleJob extends CallableJob
{
public function __construct()
{
$this->duration(seconds: 10);
$this->retry(max: 2);
// or using its classes:
$this->parameter(new Parameter\Priority(100));
}
//...
}
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Delay(seconds: 60))
// or using helper method:
->delay(seconds: 60);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Data(['key' => 'value']))
// or using helper method:
->data(['key' => 'value']);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Duration(seconds: 10))
// or using helper method:
->duration(seconds: 10);
use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Tobento\Service\Encryption\EncrypterInterface;
$container = new Container();
$container->set(EncrypterInterface::class, function() {
// create enrcypter:
return $encrypter;
});
$jobProcessor = new JobProcessor($container);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Encrypt())
// or using helper method:
->encrypt();
use Tobento\Service\Queue\Parameter\Monitor;
if ($job->parameters()->has(Monitor::class)) {
$monitor = $job->parameters()->get(Monitor::class);
$runtimeInSeconds = $monitor->runtimeInSeconds();
$memoryUsage = $monitor->memoryUsage();
}
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Priority(100))
// or using helper method:
->priority(100);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Pushing(
handler: function(JobInterface $job, AnyResolvableClass $foo): JobInterface {
return $job;
},
// you may set a priority. Higher gets executed first:
priority: 100, // 0 is default
))
// or using helper method:
->pushing(handler: function() {}, priority: 100);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Queue(name: 'secondary'))
// or using helper method:
->queue(name: 'secondary');
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Retry(max: 2))
// or using helper method:
->retry(max: 2);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\Unique(
// A unique id. If null it uses the job id.
id: null, // null|string
))
// or using helper method:
->unique(id: null);
use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;
$container = new Container();
$container->set(CacheInterface::class, function() {
// create cache:
return new Psr6Cache(
pool: new ArrayCacheItemPool(
clock: new SystemClock(),
),
namespace: 'default',
ttl: null,
);
});
$jobProcessor = new JobProcessor($container);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;
$job = (new Job(name: 'sample'))
->parameter(new Parameter\WithoutOverlapping(
// A unique id. If null it uses the job id.
id: null, // null|string
))
// or using helper method:
->withoutOverlapping(id: null);
use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;
$container = new Container();
$container->set(CacheInterface::class, function() {
// create cache:
return new Psr6Cache(
pool: new ArrayCacheItemPool(
clock: new SystemClock(),
),
namespace: 'default',
ttl: null,
);
});
$jobProcessor = new JobProcessor($container);
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueueInterface;
class SomeService
{
public function createJob(QueueInterface $queue): void
{
$job = new Job(
name: 'sample',
payload: ['key' => 'value'],
);
$queue->push($job);
}
}
use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueException;
class SomeService
{
public function createJob(QueuesInterface $queues): void
{
$job = new Job(name: 'sample');
$queues->queue(name: 'secondary')->push($job);
// throws QueueException if not exists.
// or
$queues->get(name: 'secondary')?->push($job);
// or you may check if queue exists before:
if ($queues->has(name: 'secondary')) {
$queues->queue(name: 'secondary')->push($job);
}
}
}
use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
$queue = new InMemoryQueue(
name: 'inmemeory',
jobProcessor: $jobProcessor, // JobProcessorInterface
priority: 100,
);
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\QueueInterface;
$queue = new NullQueue(
name: 'null',
priority: 100,
);
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\Storage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Storage\StorageInterface;
use Psr\Clock\ClockInterface;
$queue = new Storage\Queue(
name: 'storage',
jobProcessor: $jobProcessor, // JobProcessorInterface
storage: $storage, // StorageInterface
clock: $clock, // ClockInterface
table: 'jobs',
priority: 100,
);
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
$queue = new SyncQueue(
name: 'sync',
jobProcessor: $jobProcessor, // JobProcessorInterface
eventDispatcher: null, // null|EventDispatcherInterface
priority: 100,
);
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\Queues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;
$queues = new Queues(
$queue, // QueueInterface
$anotherQueue, // QueueInterface
);
var_dump($queues instanceof QueuesInterface);
// bool(true)
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\LazyQueues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\NullQueue;
use Psr\Container\ContainerInterface;
$queues = new LazyQueues(
container: $container, // ContainerInterface
queues: [
// using a factory:
'primary' => [
// factory must implement QueueFactoryInterface
'factory' => QueueFactory::class,
'config' => [
'queue' => SyncQueue::class,
'priority' => 100,
],
],
// using a closure:
'secondary' => static function (string $name, ContainerInterface $c): QueueInterface {
// create queue ...
return $queue;
},
// or you may sometimes just create the queue (not lazy):
'null' => new NullQueue(name: 'null'),
],
);
var_dump($queues instanceof QueuesInterface);
// bool(true)
var_dump($queue instanceof QueueInterface);
// bool(true)
use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;
$factory = new QueueFactory(
jobProcessor: $jobProcessor // JobProcessorInterface
);
var_dump($factory instanceof QueueFactoryInterface);
// bool(true)
use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;
$queue = $factory->createQueue(name: 'primary', config: [
// specify the queue you want to create:
'queue' => InMemoryQueue::class,
//'queue' => NullQueue::class,
//'queue' => SyncQueue::class,
// you may specify a priority:
'priority' => 200,
]);
var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.
use Tobento\Service\Queue\Storage\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Database\DatabasesInterface;
use Psr\Clock\ClockInterface;
$factory = new QueueFactory(
jobProcessor: $jobProcessor, // JobProcessorInterface
clock: $clock, // ClockInterface
databases: null, // null|DatabasesInterface
);
var_dump($factory instanceof QueueFactoryInterface);
// bool(true)
use Tobento\Service\Storage\JsonFileStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;
$queue = $factory->createQueue(name: 'primary', config: [
// specify the table storage:
'table' => 'queue',
// specify the storage:
'storage' => JsonFileStorage::class,
'dir' => 'home/private/storage/',
// you may specify a priority:
'priority' => 200,
]);
var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.
use Tobento\Service\Storage\InMemoryStorage;
use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;
$queue = $factory->createQueue(name: 'primary', config: [
// specify the table storage:
'table' => 'queue',
// specify the storage:
'storage' => InMemoryStorage::class,
// you may specify a priority:
'priority' => 200,
]);
var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.
use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;
$queue = $factory->createQueue(name: 'primary', config: [
// specify the table storage:
'table' => 'queue',
// specify the storage:
'storage' => PdoMySqlStorage::class,
//'storage' => PdoMariaDbStorage::class,
// specify the name of the database used:
'database' => 'name',
// you may specify a priority:
'priority' => 200,
]);
var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.
use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\JobHandlerInterface;
use Psr\Container\ContainerInterface;
$jobProcessor = new JobProcessor(
container: $container // ContainerInterface
);
var_dump($jobProcessor instanceof JobProcessorInterface);
// bool(true)
use Tobento\Service\Queue\JobHandlerInterface;
$jobProcessor->addJobHandler(
name: 'sample',
handler: SampleHandler::class, // string|JobHandlerInterface
);
use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;
final class SampleHandler implements JobHandlerInterface
{
public function handleJob(JobInterface $job): void
{
// handle job
}
}
use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Tobento\Service\Queue\QueuesInterface;
$handler = new FailedJobHandler(
queues: $queues, // QueuesInterface
);
var_dump($handler instanceof FailedJobHandlerInterface);
// bool(true)
use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobInterface;
use Psr\Log\LoggerInterface;
class LogFailedJobHandler extends FailedJobHandler
{
public function __construct(
protected null|QueuesInterface $queues = null,
protected null|LoggerInterface $logger = null,
) {}
protected function finallyFailed(JobInterface $job, \Throwable $e): void
{
if (is_null($this->logger)) {
return;
}
$this->logger->error(
sprintf('Job %s with the id %s failed: %s', $job->getName(), $job->getId(), $e->getMessage()),
[
'name' => $job->getName(),
'id' => $job->getId(),
'payload' => $job->getPayload(),
'parameters' => $job->parameters()->jsonSerialize(),
'exception' => $e,
]
);
}
/**
* Handle exception thrown by the worker e.g.
*/
public function handleException(\Throwable $e): void
{
if (is_null($this->logger)) {
return;
}
$this->logger->error(
sprintf('Queue exception: %s', $e->getMessage()),
[
'exception' => $e,
]
);
}
}
use Tobento\Service\Queue\Worker;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
$worker = new Worker(
queues: $queues, // QueuesInterface
jobProcessor: $jobProcessor, // JobProcessorInterface
failedJobHandler: $failedJobHandler, // null|FailedJobHandlerInterface
eventDispatcher: $eventDispatcher, // null|EventDispatcherInterface
);
use Tobento\Service\Queue\WorkerOptions;
$status = $worker->run(
// specify the name of the queue you wish to use.
// If null, it uses all queues by its priority, highest first.
queue: 'name', // null|string
// specify the options:
options: new WorkerOptions(
// The maximum amount of RAM the worker may consume:
memory: 128,
// The maximum number of seconds a worker may run:
timeout: 60,
// The number of seconds to wait in between polling the queue:
sleep: 3,
// The maximum number of jobs to run, 0 (unlimited):
maxJobs: 0,
// Indicates if the worker should stop when the queue is empty:
stopWhenEmpty: false,
),
);
// you may exit:
exit($status);
use Tobento\Service\Queue\Event;
use Tobento\Service\Queue\Parameter\Parameter;
class SampleParameter extends Parameter
{
//
}
use Tobento\Service\Queue\Parameter\Parameter;
use JsonSerializable;
class SampleParameter extends Parameter implements JsonSerializable
{
public function __construct(
private string $value,
) {}
/**
* Serializes the object to a value that can be serialized natively by json_encode().
* Will be used to create the parameter by the parameters factory.
* So it must much its __construct method.
*
* @return array
*/
public function jsonSerialize(): array
{
return ['value' => $this->value];
}
}
use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Failable;
use Tobento\Service\Queue\JobInterface;
use Throwable;
class SampleParameter extends Parameter implements Failable
{
/**
* Returns the failed job handler.
*
* @return callable
*/
public function getFailedJobHandler(): callable
{
return [$this, 'processFailedJob'];
}
/**
* Process failed job.
*
* @param JobInterface $job
* @param Throwable $e
* @param ... any parameters resolvable by your container.
* @return void
*/
public function processFailedJob(JobInterface $job, Throwable $e): void
{
//
}
}
use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Poppable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;
use JsonSerializable;
class SampleParameter extends Parameter implements Poppable, JsonSerializable
{
/**
* Returns the popping job handler.
*
* @return callable
*/
public function getPoppingJobHandler(): callable
{
return [$this, 'poppingJob'];
}
/**
* Popping job.
*
* @param JobInterface $job
* @param QueueInterface $queue
* @param ... any parameters resolvable by your container.
* @return null|JobInterface
*/
public function poppingJob(JobInterface $job, QueueInterface $queue): null|JobInterface
{
// called after the job is popped from the queue.
// If returning null, the job gets not processed.
return $job;
}
/**
* Implemented as the parameter gets stored. Otherwise popping job handler gets not executed.
*/
public function jsonSerialize(): array
{
return [];
}
}
use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Processable;
use Tobento\Service\Queue\JobInterface;
use JsonSerializable;
class SampleParameter extends Parameter implements Processable, JsonSerializable
{
/**
* Returns the before process job handler.
*
* @return null|callable
*/
public function getBeforeProcessJobHandler(): null|callable
{
return [$this, 'beforeProcessJob'];
// or return null if not ;
}
/**
* After process job handler.
*
* @param JobInterface $job
* @return JobInterface
*/
public function afterProcessJob(JobInterface $job): JobInterface
{
return $job;
}
/**
* Implemented as the parameter gets stored. Otherwise handlers gets not executed.
*/
public function jsonSerialize(): array
{
return [];
}
}
use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Pushable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;
class SampleParameter extends Parameter implements Pushable
{
/**
* Returns the pushing job handler.
*
* @return callable
*/
public function getPushingJobHandler(): callable
{
return [$this, 'pushingJob'];
}
/**
* Pushing job.
*
* @param JobInterface $job
* @param QueueInterface $queue
* @param ... any parameters resolvable by your container.
* @return JobInterface
*/
public function pushingJob(JobInterface $job, QueueInterface $queue): JobInterface
{
return $job;
}
}
use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\Parameter;
use Tobento\Service\Queue\QueuesInterface;
final class ChunkableJob extends CallableJob
{
public function handleJob(
JobInterface $job,
QueuesInterface $queues,
// Repository $repository,
): void {
if (! $job->parameters()->has(Parameter\Data::class)) {
// first time running job:
$data = new Parameter\Data([
//'total' => $repository->count(),
'total' => 500,
'number' => 100,
'offset' => 0,
]);
$job->parameters()->add($data);
} else {
$data = $job->parameters()->get(Parameter\Data::class);
}
$total = $data->get(key: 'total', default: 0);
$number = $data->get(key: 'number', default: 100);
$offset = $data->get(key: 'offset', default: 0);
// Handle Job:
//$items = $repository->findAll(limit: [$number, $offset]);
$items = range($offset, $number); // For demo we use range
foreach($items as $item) {
// do sth
}
// Update offset:
$data->set(key: 'offset', value: $offset+$number);
// Repush to queue if not finished:
if ($offset < $total) {
$queues->queue(
name: $job->parameters()->get(Parameter\Queue::class)->name()
)->push($job);
}
}
public function getPayload(): array
{
return [];
}
}