PHP code example of tobento / service-queue

1. Go to this page and download the library: Download tobento/service-queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

tobento / service-queue example snippets


use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;

$job = new Job(
    name: 'sample',
    payload: ['key' => 'value'],
);

var_dump($job instanceof JobInterface);
// bool(true)

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;

final class Mail implements JobHandlerInterface
{
    public function __construct(
        private MailerInterface $mailer,
        private MessageFacotryInterface $messageFactory,
    ) {}

    public function handleJob(JobInterface $job): void
    {
        $message = $this->messageFactory->createFromArray($job->getPayload());
        
        $this->mailer->send($message);
    }
    
    public static function toPayload(MessageInterface $message): array
    {
        return $message->jsonSerialize();
    }
}

use Tobento\Service\Queue\Job;

$job = new Job(
    name: Mail::class,
    payload: Mail::toPayload($message),
);

use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\JobInterface;

final class MailJob extends CallableJob
{
    public function __construct(
        private null|MessageInterface $message = null,
    ) {}

    public function handleJob(
        JobInterface $job,
        MailerInterface $mailer,
        MessageFacotryInterface $messageFactory,
    ): void {
        $message = $messageFactory->createFromArray($job->getPayload());
        
        $mailer->send($message);
    }
    
    public function getPayload(): array
    {
        if (is_null($this->message)) {
            return []; // or throw exception
        }
        
        return $this->message->jsonSerialize();
    }
    
    public function renderTemplate(): static
    {
        // render template logic ...
        return $this;
    }
}

$job = (new MailJob($message))
    ->renderTemplate();

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Duration(seconds: 10))
    ->parameter(new Parameter\Retry(max: 2));

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;

$job = (new Job(name: 'sample'))
    ->queue(name: 'secondary')
    ->data(['key' => 'value'])
    ->duration(seconds: 10)
    ->retry(max: 2)
    ->delay(seconds: 5)
    ->unique()
    ->priority(100)
    ->pushing(function() {})
    ->encrypt();

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\Parameter;

final class SampleJob extends CallableJob
{
    public function __construct()
    {
        $this->duration(seconds: 10);
        $this->retry(max: 2);
        
        // or using its classes:
        $this->parameter(new Parameter\Priority(100));
    }

    //...
}

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Delay(seconds: 60))
    // or using helper method:
    ->delay(seconds: 60);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Data(['key' => 'value']))
    // or using helper method:
    ->data(['key' => 'value']);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Duration(seconds: 10))
    // or using helper method:
    ->duration(seconds: 10);

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Tobento\Service\Encryption\EncrypterInterface;

$container = new Container();
$container->set(EncrypterInterface::class, function() {
    // create enrcypter:
    return $encrypter;
});

$jobProcessor = new JobProcessor($container);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Encrypt())
    // or using helper method:
    ->encrypt();

use Tobento\Service\Queue\Parameter\Monitor;

if ($job->parameters()->has(Monitor::class)) {
    $monitor = $job->parameters()->get(Monitor::class);
    $runtimeInSeconds = $monitor->runtimeInSeconds();
    $memoryUsage = $monitor->memoryUsage();
}

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Priority(100))
    // or using helper method:
    ->priority(100);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\Parameter;
    
$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Pushing(
        handler: function(JobInterface $job, AnyResolvableClass $foo): JobInterface {
            return $job;
        },
        
        // you may set a priority. Higher gets executed first:
        priority: 100, // 0 is default
    ))
    
    // or using helper method:
    ->pushing(handler: function() {}, priority: 100);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Queue(name: 'secondary'))
    // or using helper method:
    ->queue(name: 'secondary');

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Retry(max: 2))
    // or using helper method:
    ->retry(max: 2);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Unique(
        // A unique id. If null it uses the job id.
        id: null, // null|string
    ))
    // or using helper method:
    ->unique(id: null);

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;

$container = new Container();
$container->set(CacheInterface::class, function() {
    // create cache:
    return new Psr6Cache(
        pool: new ArrayCacheItemPool(
            clock: new SystemClock(),
        ),
        namespace: 'default',
        ttl: null,
    );
});

$jobProcessor = new JobProcessor($container);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\WithoutOverlapping(
        // A unique id. If null it uses the job id.
        id: null, // null|string
    ))
    // or using helper method:
    ->withoutOverlapping(id: null);

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;

$container = new Container();
$container->set(CacheInterface::class, function() {
    // create cache:
    return new Psr6Cache(
        pool: new ArrayCacheItemPool(
            clock: new SystemClock(),
        ),
        namespace: 'default',
        ttl: null,
    );
});

$jobProcessor = new JobProcessor($container);

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueueInterface;

class SomeService
{
    public function createJob(QueueInterface $queue): void
    {
        $job = new Job(
            name: 'sample',
            payload: ['key' => 'value'],
        );

        $queue->push($job);
    }
}

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueException;

class SomeService
{
    public function createJob(QueuesInterface $queues): void
    {
        $job = new Job(name: 'sample');
        
        $queues->queue(name: 'secondary')->push($job);
        // throws QueueException if not exists.
        
        // or
        $queues->get(name: 'secondary')?->push($job);
        
        // or you may check if queue exists before:
        if ($queues->has(name: 'secondary')) {
            $queues->queue(name: 'secondary')->push($job);
        }
    }
}

use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;

$queue = new InMemoryQueue(
    name: 'inmemeory',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\QueueInterface;

$queue = new NullQueue(
    name: 'null',
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\Storage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Storage\StorageInterface;
use Psr\Clock\ClockInterface;

$queue = new Storage\Queue(
    name: 'storage',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    storage: $storage, // StorageInterface
    clock: $clock, // ClockInterface
    table: 'jobs',
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Psr\EventDispatcher\EventDispatcherInterface;

$queue = new SyncQueue(
    name: 'sync',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    eventDispatcher: null, // null|EventDispatcherInterface
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\Queues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;

$queues = new Queues(
    $queue, // QueueInterface
    $anotherQueue, // QueueInterface
);

var_dump($queues instanceof QueuesInterface);
// bool(true)

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\LazyQueues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\NullQueue;
use Psr\Container\ContainerInterface;

$queues = new LazyQueues(
    container: $container, // ContainerInterface
    queues: [
        // using a factory:
        'primary' => [
            // factory must implement QueueFactoryInterface
            'factory' => QueueFactory::class,
            'config' => [
                'queue' => SyncQueue::class,
                'priority' => 100,
            ],
        ],
        
        // using a closure:
        'secondary' => static function (string $name, ContainerInterface $c): QueueInterface {
            // create queue ...
            return $queue;
        },
        
        // or you may sometimes just create the queue (not lazy):
        'null' => new NullQueue(name: 'null'),
    ],
);

var_dump($queues instanceof QueuesInterface);
// bool(true)

var_dump($queue instanceof QueueInterface);
// bool(true)

use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;

$factory = new QueueFactory(
    jobProcessor: $jobProcessor // JobProcessorInterface
);

var_dump($factory instanceof QueueFactoryInterface);
// bool(true)

use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the queue you want to create:
    'queue' => InMemoryQueue::class,
    //'queue' => NullQueue::class,
    //'queue' => SyncQueue::class,
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

use Tobento\Service\Queue\Storage\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Database\DatabasesInterface;
use Psr\Clock\ClockInterface;

$factory = new QueueFactory(
    jobProcessor: $jobProcessor, // JobProcessorInterface
    clock: $clock, // ClockInterface
    databases: null, // null|DatabasesInterface
);

var_dump($factory instanceof QueueFactoryInterface);
// bool(true)

use Tobento\Service\Storage\JsonFileStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => JsonFileStorage::class,
    'dir' => 'home/private/storage/',
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

use Tobento\Service\Storage\InMemoryStorage;
use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => InMemoryStorage::class,
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => PdoMySqlStorage::class,
    //'storage' => PdoMariaDbStorage::class,
    
    // specify the name of the database used:
    'database' => 'name',
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\JobHandlerInterface;
use Psr\Container\ContainerInterface;

$jobProcessor = new JobProcessor(
    container: $container // ContainerInterface
);

var_dump($jobProcessor instanceof JobProcessorInterface);
// bool(true)

use Tobento\Service\Queue\JobHandlerInterface;

$jobProcessor->addJobHandler(
    name: 'sample',
    handler: SampleHandler::class, // string|JobHandlerInterface
);

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;

final class SampleHandler implements JobHandlerInterface
{
    public function handleJob(JobInterface $job): void
    {
        // handle job
    }
}

use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Tobento\Service\Queue\QueuesInterface;

$handler = new FailedJobHandler(
    queues: $queues, // QueuesInterface
);

var_dump($handler instanceof FailedJobHandlerInterface);
// bool(true)

use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobInterface;
use Psr\Log\LoggerInterface;

class LogFailedJobHandler extends FailedJobHandler
{
    public function __construct(
        protected null|QueuesInterface $queues = null,
        protected null|LoggerInterface $logger = null,
    ) {}
    
    protected function finallyFailed(JobInterface $job, \Throwable $e): void
    {
        if (is_null($this->logger)) {
            return;
        }

        $this->logger->error(
            sprintf('Job %s with the id %s failed: %s', $job->getName(), $job->getId(), $e->getMessage()),
            [
                'name' => $job->getName(),
                'id' => $job->getId(),
                'payload' => $job->getPayload(),
                'parameters' => $job->parameters()->jsonSerialize(),
                'exception' => $e,
            ]
        );
    }
    
    /**
     * Handle exception thrown by the worker e.g.
     */
    public function handleException(\Throwable $e): void
    {
        if (is_null($this->logger)) {
            return;
        }

        $this->logger->error(
            sprintf('Queue exception: %s', $e->getMessage()),
            [
                'exception' => $e,
            ]
        );
    }
}

use Tobento\Service\Queue\Worker;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;

$worker = new Worker(
    queues: $queues, // QueuesInterface
    jobProcessor: $jobProcessor, // JobProcessorInterface
    failedJobHandler: $failedJobHandler, // null|FailedJobHandlerInterface
    eventDispatcher: $eventDispatcher, // null|EventDispatcherInterface
);

use Tobento\Service\Queue\WorkerOptions;

$status = $worker->run(
    // specify the name of the queue you wish to use.
    // If null, it uses all queues by its priority, highest first.
    queue: 'name', // null|string
    
    // specify the options:
    options: new WorkerOptions(
        // The maximum amount of RAM the worker may consume:
        memory: 128,
        
        // The maximum number of seconds a worker may run:
        timeout: 60,
        
        // The number of seconds to wait in between polling the queue:
        sleep: 3,
        
        // The maximum number of jobs to run, 0 (unlimited):
        maxJobs: 0,
        
        // Indicates if the worker should stop when the queue is empty:
        stopWhenEmpty: false,
    ),
);

// you may exit:
exit($status);

use Tobento\Service\Queue\Event;

use Tobento\Service\Queue\Parameter\Parameter;

class SampleParameter extends Parameter
{
    //
}

use Tobento\Service\Queue\Parameter\Parameter;
use JsonSerializable;

class SampleParameter extends Parameter implements JsonSerializable
{
    public function __construct(
        private string $value,
    ) {}
    
    /**
     * Serializes the object to a value that can be serialized natively by json_encode().
     * Will be used to create the parameter by the parameters factory.
     * So it must much its __construct method.
     *
     * @return array
     */
    public function jsonSerialize(): array
    {
        return ['value' => $this->value];
    }
}

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Failable;
use Tobento\Service\Queue\JobInterface;
use Throwable;

class SampleParameter extends Parameter implements Failable
{
    /**
     * Returns the failed job handler.
     *
     * @return callable
     */
    public function getFailedJobHandler(): callable
    {
        return [$this, 'processFailedJob'];
    }
    
    /**
     * Process failed job.
     *
     * @param JobInterface $job
     * @param Throwable $e
     * @param ... any parameters resolvable by your container.
     * @return void
     */
    public function processFailedJob(JobInterface $job, Throwable $e): void
    {
        //
    }
}

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Poppable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;
use JsonSerializable;

class SampleParameter extends Parameter implements Poppable, JsonSerializable
{
    /**
     * Returns the popping job handler.
     *
     * @return callable
     */
    public function getPoppingJobHandler(): callable
    {
        return [$this, 'poppingJob'];
    }
    
    /**
     * Popping job.
     *
     * @param JobInterface $job
     * @param QueueInterface $queue
     * @param ... any parameters resolvable by your container.
     * @return null|JobInterface
     */
    public function poppingJob(JobInterface $job, QueueInterface $queue): null|JobInterface
    {
        // called after the job is popped from the queue.
        // If returning null, the job gets not processed.
        return $job;
    }
    
    /**
     * Implemented as the parameter gets stored. Otherwise popping job handler gets not executed.
     */
    public function jsonSerialize(): array
    {
        return [];
    }
}

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Processable;
use Tobento\Service\Queue\JobInterface;
use JsonSerializable;

class SampleParameter extends Parameter implements Processable, JsonSerializable
{
    /**
     * Returns the before process job handler.
     *
     * @return null|callable
     */
    public function getBeforeProcessJobHandler(): null|callable
    {
        return [$this, 'beforeProcessJob'];
        // or return null if not ;
    }
    
    /**
     * After process job handler.
     *
     * @param JobInterface $job
     * @return JobInterface
     */
    public function afterProcessJob(JobInterface $job): JobInterface
    {
        return $job;
    }
    
    /**
     * Implemented as the parameter gets stored. Otherwise handlers gets not executed.
     */
    public function jsonSerialize(): array
    {
        return [];
    }
}

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Pushable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;

class SampleParameter extends Parameter implements Pushable
{
    /**
     * Returns the pushing job handler.
     *
     * @return callable
     */
    public function getPushingJobHandler(): callable
    {
        return [$this, 'pushingJob'];
    }
    
    /**
     * Pushing job.
     *
     * @param JobInterface $job
     * @param QueueInterface $queue
     * @param ... any parameters resolvable by your container.
     * @return JobInterface
     */
    public function pushingJob(JobInterface $job, QueueInterface $queue): JobInterface
    {
        return $job;
    }
}

use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\Parameter;
use Tobento\Service\Queue\QueuesInterface;

final class ChunkableJob extends CallableJob
{
    public function handleJob(
        JobInterface $job,
        QueuesInterface $queues,
        // Repository $repository,
    ): void {
        if (! $job->parameters()->has(Parameter\Data::class)) {
            // first time running job:
            $data = new Parameter\Data([
                //'total' => $repository->count(),
                'total' => 500,
                'number' => 100,
                'offset' => 0,
            ]);
            
            $job->parameters()->add($data);
        } else {
            $data = $job->parameters()->get(Parameter\Data::class);
        }
        
        $total = $data->get(key: 'total', default: 0);
        $number = $data->get(key: 'number', default: 100);
        $offset = $data->get(key: 'offset', default: 0);
                
        // Handle Job:
        //$items = $repository->findAll(limit: [$number, $offset]);
        $items = range($offset, $number); // For demo we use range
        
        foreach($items as $item) {
            // do sth
        }
        
        // Update offset:
        $data->set(key: 'offset', value: $offset+$number);
        
        // Repush to queue if not finished:
        if ($offset < $total) {
            $queues->queue(
                name: $job->parameters()->get(Parameter\Queue::class)->name()
            )->push($job);
        }
    }
    
    public function getPayload(): array
    {
        return [];
    }    
}
__construct

php app queue:work

php app queue:work --queue=primary

php app queue:clear

php app queue:clear --queue=primary --queue=secondary