PHP code example of reactphp-x / queue

1. Go to this page and download the library: Download reactphp-x/queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

reactphp-x / queue example snippets


use React\EventLoop\Loop;
use Clue\React\Redis\RedisClient;
use ReactphpX\Queue\Queue;

$loop = Loop::get();
$redis = new RedisClient('redis://127.0.0.1:6379');
$queue = new Queue($redis, 'example');

// 入队操作
$queue->enqueue(['task' => 'task1', 'data' => 'some data'])->then(function () {
    echo "Task1 入队成功\n";
});

// 获取队列大小
$queue->size()->then(function ($size) {
    echo "当前队列大小: {$size}\n";
});

// 出队操作
$queue->dequeue()->then(function ($data) {
    echo "出队数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
});

// 阻塞式出队(等待5秒)
$queue->blockingDequeue(5)->then(function ($data) {
    if ($data) {
        echo "获取到数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
    } else {
        echo "等待超时,没有数据\n";
    }
});

// 向不同优先级的队列添加任务
$queue->enqueue($task, 'high');
$queue->enqueue($task, 'medium');
$queue->enqueue($task, 'low');

// 按优先级处理任务
$queue->dequeue('high');
$queue->dequeue('medium');
$queue->dequeue('low');

use ReactphpX\Queue\JobManager;
use ReactphpX\Queue\Storage\RedisStorageDriver;

$storage = new RedisStorageDriver($redis);
$jobManager = new JobManager($storage, $queue);

// 推送带状态跟踪的任务
$jobManager->pushJob('job-1', function () {
    return "Job completed";
}, 'default', true);

// 获取任务状态
$jobManager->getAllJobs()->then(function ($jobs) {
    foreach ($jobs as $jobId => $job) {
        echo "Job ID: $jobId, Status: {$job['status']}\n";
    }
});

use ReactphpX\Queue\Consumer;

$consumer = new Consumer($queue);

// 注册任务处理器
$consumer->consume(function ($data) {
    echo "Processing task: " . json_encode($data) . "\n";
    // 处理任务逻辑
    return true;
});

use ReactphpX\Queue\ArrayQueue;

$queue = new ArrayQueue();

$queue->enqueue('task1')->then(function () {
    echo "Task added\n";
});

$queue->dequeue()->then(function ($data) {
    echo "Processing: $data\n";
});