PHP code example of gensart-x / bullmq-php

1. Go to this page and download the library: Download gensart-x/bullmq-php library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

gensart-x / bullmq-php example snippets


use BullMQ\Queue;

// Create a queue with default connection (localhost:6379)
$queue = new Queue('my-queue');

// Or with custom Redis connection
$queue = new Queue('my-queue', [
    'connection' => [
        'host' => 'redis.example.com',
        'port' => 6379,
        'password' => 'your-password',
    ],
]);

use BullMQ\Queue;

$queue = new Queue('email-queue');

// Add a simple job
$job = $queue->add('send-email', [
    'to' => '[email protected]',
    'subject' => 'Welcome!',
    'body' => 'Thanks for signing up.',
]);

echo "Job added with ID: " . $job->id . "\n";

// Delayed job (delay in milliseconds)
$job = $queue->add('reminder', $data, [
    'delay' => 60000, // Process after 60 seconds
]);

// Priority job (lower number = higher priority)
$job = $queue->add('urgent', $data, [
    'priority' => 1,
]);

// Custom job ID
$job = $queue->add('process-order', $data, [
    'jobId' => 'order-' . $orderId,
]);

// Job with retry settings
$job = $queue->add('flaky-operation', $data, [
    'attempts' => 3,
    'backoff' => [
        'type' => 'exponential',
        'delay' => 1000,
    ],
]);

// Job with removal policy
$job = $queue->add('task', $data, [
    'removeOnComplete' => true,
    'removeOnFail' => 100, // Keep last 100 failed jobs
]);

// LIFO (Last In, First Out) - process newest jobs first
$job = $queue->add('task', $data, [
    'lifo' => true,
]);

// Custom timestamp (defaults to current time)
$job = $queue->add('task', $data, [
    'timestamp' => (int)(microtime(true) * 1000),
]);

$jobs = $queue->addBulk([
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
]);

// Get a specific job
$job = $queue->getJob('job-id');
if ($job) {
    echo "Job name: " . $job->name . "\n";
    echo "Job data: " . json_encode($job->data) . "\n";
    echo "Job state: " . $queue->getJobState($job->id) . "\n";
}

// Get jobs by state
$waitingJobs = $queue->getWaiting(0, 10);
$activeJobs = $queue->getActive(0, 10);
$delayedJobs = $queue->getDelayed(0, 10);
$completedJobs = $queue->getCompleted(0, 10);
$failedJobs = $queue->getFailed(0, 10);

// Get job counts
$counts = $queue->getJobCounts();
echo "Waiting: " . $counts['waiting'] . "\n";
echo "Active: " . $counts['active'] . "\n";
echo "Delayed: " . $counts['delayed'] . "\n";
echo "Completed: " . $counts['completed'] . "\n";
echo "Failed: " . $counts['failed'] . "\n";

// Get count for specific types
$pending = $queue->getJobCountByTypes('waiting', 'delayed');
echo "Pending jobs: " . $pending . "\n";

// Get counts grouped by priority
$priorityCounts = $queue->getCountsPerPriority([0, 1, 2, 3]);
echo "Priority 0: " . $priorityCounts[0] . "\n";
echo "Priority 1: " . $priorityCounts[1] . "\n";

// Pause the queue
$queue->pause();
echo "Queue paused: " . ($queue->isPaused() ? 'yes' : 'no') . "\n";

// Resume the queue
$queue->resume();

// Remove a specific job
$removed = $queue->remove('job-id');

// Clean old jobs (grace period in milliseconds)
$cleaned = $queue->clean(
    grace: 3600000,  // 1 hour
    limit: 100,
    type: 'completed'
);

// Retry failed jobs with options
$queue->retryJobs([
    'count' => 100,      // Max jobs to retry per iteration
    'state' => 'failed', // State to retry from: 'failed' or 'completed'
    'timestamp' => time() * 1000, // Only retry jobs before this timestamp
]);

// Promote delayed jobs (move to waiting)
$queue->promoteJobs(['count' => 100]);

// Drain the queue (remove all waiting jobs)
$queue->drain();

// Obliterate the queue (remove everything)
$queue->obliterate(['force' => true]);

// Add a child job with a parent
$childJob = $queue->add('child-task', $childData, [
    'parent' => [
        'id' => 'parent-job-id',
        'queue' => 'bull:parent-queue',
    ],
]);

// Using a connection array
$queue = new Queue('my-queue', [
    'connection' => [
        'host' => 'localhost',
        'port' => 6379,
        'database' => 0,
        'password' => null,
        'username' => null,
    ],
]);

// Using a Redis URI
$queue = new Queue('my-queue', [
    'connection' => 'redis://user:password@localhost:6379/0',
]);

// Sharing a connection
use BullMQ\RedisConnection;

$connection = new RedisConnection([
    'host' => 'localhost',
    'port' => 6379,
]);

$queue1 = new Queue('queue-1', ['connection' => $connection]);
$queue2 = new Queue('queue-2', ['connection' => $connection]);

// Custom prefix
$queue = new Queue('my-queue', [
    'prefix' => 'myapp',
]);

use BullMQ\Queue;

try {
    $queue = new Queue('my-queue');
    $job = $queue->add('task', $data);
} catch (\RuntimeException $e) {
    echo "Error: " . $e->getMessage() . "\n";
}