PHP code example of solophp / job-queue

1. Go to this page and download the library: Download solophp/job-queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

solophp / job-queue example snippets


use Doctrine\DBAL\DriverManager;
use Solo\JobQueue\JobQueue;
use Solo\JobQueue\RetryPolicy;
use Solo\JobQueue\Schema;
use Solo\JobQueue\Storage\DbalJobStorage;

$connection = DriverManager::getConnection([
    'driver'   => 'pdo_mysql',
    'host'     => 'localhost',
    'dbname'   => 'app',
    'user'     => 'app',
    'password' => '***',
]);

Schema::install($connection); // create the `jobs` table (idempotent)

$queue = new JobQueue(
    storage:          new DbalJobStorage($connection, 'jobs'),
    retry:            new RetryPolicy(
        maxRetries:  3,     // permanent-fail after N attempts
        lockTimeout: 600,   // seconds before a stuck job is reclaimed
        baseDelay:   60,    // seconds — first retry delay (doubles each time)
        maxDelay:    3600,  // hard cap on backoff
    ),
    deleteOnSuccess:  false, // delete successful jobs instead of keeping history
    container:        $container, // PSR-11 container, used by createFromContainer
    logger:           $logger,    // optional PSR-3 logger
    listener:         $listener,  // optional JobQueueListener for metrics / tracing
    autoReclaim:      true,       // call reclaimStuck() automatically from processJobs()
);

// Or with a preset:
$queue = new JobQueue(storage: $storage, retry: RetryPolicy::aggressive(), ...);

use Psr\Container\ContainerInterface;
use Solo\Contracts\JobQueue\JobInterface;

final class SendEmailJob implements JobInterface, \JsonSerializable
{
    private ?Mailer $mailer = null;

    public function __construct(
        private readonly string $to,
        private readonly string $subject,
        private readonly string $body,
    ) {}

    public static function createFromContainer(ContainerInterface $c, array $data): self
    {
        $job = new self($data['to'], $data['subject'], $data['body']);
        $job->mailer = $c->get(Mailer::class);
        return $job;
    }

    public function jsonSerialize(): array
    {
        return ['to' => $this->to, 'subject' => $this->subject, 'body' => $this->body];
    }

    public function handle(): void
    {
        $this->mailer->send($this->to, $this->subject, $this->body);
    }
}

$queue->push(new SendEmailJob('[email protected]', 'Welcome', 'Hi!'));

// With a type (useful for running per-queue workers)
$queue->push($job, 'email');

// Scheduled for later
$queue->push($job, 'email', scheduledAt: new DateTimeImmutable('+1 hour'));

// With an expiration — dropped if not executed in time
$queue->push($job, 'email',
    scheduledAt: new DateTimeImmutable(),
    expiresAt:   new DateTimeImmutable('+1 day'),
);

// Bulk-insert many jobs sharing the same type/schedule in a single statement
$jobs = array_map(fn(array $chunk) => new ChunkJob($chunk), array_chunk($ids, 500));
$queue->pushMany($jobs, 'storefront-resync');

$stats = $queue->getStats('storefront-resync');
// ['pending' => 7, 'in_progress' => 1, 'completed' => 192, 'failed' => 0]

$queue->processJobs(10);              // up to 10 jobs of any type
$queue->processJobs(10, 'email');     // only 'email' jobs

use Solo\JobQueue\Worker;
use Solo\JobQueue\WorkerLimits;

$worker = new Worker(
    queue:          $queue,
    batchSize:      10,
    type:           'email',  // optional — only process this type
    sleepWhenEmpty: 1,        // seconds to sleep when no jobs are ready
    limits:         new WorkerLimits(
        maxJobs:     1000,    // restart after N jobs (0 = unlimited)
        maxRuntime:  3600,    // restart after N seconds (0 = unlimited)
        maxMemoryMb: 256,     // restart if memory exceeds N MB (0 = unlimited)
    ),
    logger:         $logger,
);

$worker->run(); // blocks until SIGTERM/SIGINT, stop() called, or a limit hit

use Solo\Contracts\JobQueue\JobQueueListener;

final class MetricsListener implements JobQueueListener
{
    public function onClaimed(int $jobId, string $jobClass): void
    {
        Metrics::increment('queue.claimed', tags: ['job' => $jobClass]);
    }
    public function onCompleted(int $jobId): void
    {
        Metrics::increment('queue.completed');
    }
    public function onFailed(int $jobId, Throwable|string $error, bool $permanent): void
    {
        Metrics::increment($permanent ? 'queue.dead' : 'queue.retried');
    }
    public function onReclaimed(int $requeued, int $permanentlyFailed): void
    {
        Metrics::gauge('queue.reclaimed', $requeued + $permanentlyFailed);
    }
}

$queue = new JobQueue(..., listener: new MetricsListener());

// Inspect dead jobs without writing SQL
$failed = $queue->getFailedJobs(limit: 50, type: 'email');

// Manually re-queue a dead job (resets retry_count to 0, schedules now)
$queue->retry($jobId);

// Run reclaim on a separate cron when you set autoReclaim: false
$result = $queue->reclaimStuck();
// ['requeued' => 3, 'failed' => 1]

use Solo\JobQueue\LockGuard;

$lock = new LockGuard(__DIR__ . '/storage/locks/worker.lock');

if (!$lock->acquire()) {
    exit(0); // another worker is already running
}

$queue->processJobs(50);
// $lock->release();  // optional — destructor releases it

use Solo\AsyncEventDispatcher\{AsyncEventDispatcher, ReferenceListenerRegistry, ListenerReference};
use Solo\AsyncEventDispatcher\Adapter\SoloJobQueueAdapter;

$registry = new ReferenceListenerRegistry();
$registry->addReference(UserRegistered::class, new ListenerReference(SendWelcomeEmail::class, 'handle'));

$dispatcher = new AsyncEventDispatcher($registry, new SoloJobQueueAdapter($queue, $container));
$dispatcher->dispatch(new UserRegistered('[email protected]'));

$queue->processJobs(10, 'async_event');