PHP code example of tinywan / redis-stream

1. Go to this page and download the library: Download tinywan/redis-stream library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

tinywan / redis-stream example snippets



Tinywan\RedisStream\RedisStreamQueue;
$queue = RedisStreamQueue::getInstance();


$messageId = $queue->send('Hello, Redis Stream!');
echo "Message ID: $messageId\n";


// 消费消息
$message = $queue->consume(function($message) {
    echo "Processing: " . $message['message'] . "\n";
    return true; // 确认消息
});

use Tinywan\RedisStream\RedisStreamQueue;
use Tinywan\RedisStream\Producer;
use Tinywan\RedisStream\Consumer;

// 创建队列实例
$queue = RedisStreamQueue::getInstance();

// 生产者
$producer = new Producer($queue);
$messageId = $producer->send('Task data', [
    'task_type' => 'email'
], 10); // 延迟10秒

// 消费者
$consumer = new Consumer($queue);
$consumer->run(function($message) {
    $task = json_decode($message['message'], true);
    return handleTask($task['type'], $task['data']);
});

// 立即执行
$queue->send('Immediate message');

// 延时执行(30秒后)
$queue->send('Delayed message', [], 30);

// 定时执行(1小时后)
$timestamp = time() + 3600;
$queue->send('Scheduled message', [], $timestamp);

// 年级延时(1天后)
$queue->send('Next day message', [], 86400);

// 重放消息,最多处理10条,自动确认
$count = $queue->replayMessages(function($message) {
    echo "Replaying: " . $message['message'] . "\n";
    return true;
}, 10);

// 审计消息(只读模式,不影响消息状态)
$count = $queue->auditMessages(function($message) {
    echo "Auditing: " . $message['message'] . "\n";
    return true;
}, 20);

// 从头开始读取所有消息
$message = $queue->consume(null, '0-0');

// 读取最新消息
$message = $queue->consume(null, '$');

// 从指定消息ID开始读取
$message = $queue->consumeFrom('1758943564547-0');

$redisConfig = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'password' => null,
    'database' => 0,
    'timeout' => 5,
];

$queueConfig = [
    'stream_name' => 'redis_stream_queue',
    'consumer_group' => 'redis_stream_group',
    'consumer_name' => 'consumer_' . getmypid(),
    'block_timeout' => 5000,
    'retry_attempts' => 3,
    'retry_delay' => 1000,
    'delayed_queue_suffix' => '_delayed',
    'scheduler_interval' => 1,
];

// 获取实例状态
$status = RedisStreamQueue::getInstancesStatus();

// 获取连接池状态
$poolStatus = $queue->getConnectionPoolStatus();

// 获取延迟队列统计
$stats = $queue->getDelayedQueueStats();

// 手动运行调度器
$processedCount = $queue->runDelayedScheduler(100);

// 启动调度器(运行60秒)
$queue->startDelayedScheduler(60);

// 获取队列状态
$status = [
    'stream_length' => $queue->getStreamLength(),
    'pending_count' => $queue->getPendingCount(),
    'delayed_count' => $queue->getDelayedQueueLength(),
];

// config/queue.php
'connections' => [
    'redis-stream' => [
        'driver' => 'redis-stream',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE', 'default'),
    ],
],

use Tinywan\RedisStream\RedisStreamQueue;
use Tinywan\RedisStream\Producer;

class QueueService
{
    public function sendEmail($to, $subject, $delay = 0)
    {
        $queue = RedisStreamQueue::getInstance();
        $producer = new Producer($queue);
        return $producer->send(json_encode([
            'to' => $to, 'subject' => $subject
        ]), ['type' => 'email'], $delay);
    }
}
dockerfile
FROM php:8.1-cli
RUN pecl install redis && docker-php-ext-enable redis
COPY --from=composer:latest /usr/bin/composer /usr/bin/composer
COPY . /app
WORKDIR /app
RUN composer install --no-dev --optimize-autoloader
CMD ["php", "examples/consumer.php"]
bash
# 基础示例
php examples/quickstart.php

# 生产者示例
php examples/producer.php

# 消费者示例
php examples/consumer.php

# 运行测试
./vendor/bin/phpunit