PHP code example of andreagroferreira / redis-stream

1. Go to this page and download the library: Download andreagroferreira/redis-stream library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

andreagroferreira / redis-stream example snippets


use WizardingCode\RedisStream\Exceptions\ConnectionException;
use WizardingCode\RedisStream\Exceptions\PublishException;

try {
    $producer->publish('user.created', $userData);
} catch (ConnectionException $e) {
    // Handle connection issues
    Log::error("Redis connection error: " . $e->getMessage());
    // Maybe retry or queue for later
} catch (PublishException $e) {
    // Handle publishing issues
    Log::error("Failed to publish message: " . $e->getMessage());
} catch (Exception $e) {
    // Handle other errors
    Log::error("Unexpected error: " . $e->getMessage());
}



// Publishing messages
$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class);
$messageId = $producer->publish('user.created', [
    'user_id' => 1234,
    'name' => 'John Doe',
    'email' => '[email protected]'
]);

// Consuming messages (in a console command or job)
$consumer = app(WizardingCode\RedisStream\RedisStreamConsumer::class);
$consumer->consume(function($data, $messageId) {
    $event = $data['event'];
    $payload = $data['payload'];
    
    // Process the message
    match($event) {
        'user.created' => $this->processUserCreation($payload),
        'user.updated' => $this->processUserUpdate($payload),
        default => $this->processUnknownEvent($event, $payload)
    };
    
    // Message is auto-acknowledged if no exception is thrown
});



namespace App\Handlers;

class MyStreamHandler
{
    public function handle(array $data, string $messageId): void
    {
        $event = $data['event'];
        $payload = $data['payload'];
        
        // Your custom handling logic
        logger()->info("Processing event: {$event}");
        
        // Process based on event type
        match($event) {
            'order.created' => $this->processOrder($payload),
            'payment.completed' => $this->processPayment($payload),
            default => $this->handleUnknown($event, $payload)
        };
    }
    
    protected function processOrder(array $data): void
    {
        // Process order logic
    }
    
    protected function processPayment(array $data): void
    {
        // Process payment logic
    }
    
    protected function handleUnknown(string $event, array $data): void
    {
        logger()->warning("Unknown event type: {$event}");
    }
}

$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class);

$messages = [
    [
        'event' => 'user.created',
        'payload' => ['user_id' => 1, 'name' => 'User 1'],
    ],
    [
        'event' => 'user.created',
        'payload' => ['user_id' => 2, 'name' => 'User 2'],
    ],
    [
        'event' => 'user.created',
        'payload' => ['user_id' => 3, 'name' => 'User 3'],
    ],
];

$messageIds = $producer->publishBatch($messages);

$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class);

// Trim to approximately 10,000 items (fast)
$deleted = $producer->trim(10000);

// Trim to exactly 10,000 items (slower)
$deleted = $producer->trim(10000, true);

// Get a specific producer for a named stream
$ordersProducer = app('redis_stream.producer.stream_orders');
$ordersProducer->publish('order.created', ['order_id' => 12345]);

protected function handleMessage(array $data, string $messageId): void
{
    $event = $data['event'] ?? 'unknown';
    $payload = $data['payload'] ?? [];
    
    Log::info("Processing {$event} message {$messageId}");
    
    try {
        // Handle different event types
        match ($event) {
            'order.created' => $this->processNewOrder($payload),
            'order.updated' => $this->processOrderUpdate($payload),
            'order.cancelled' => $this->processOrderCancellation($payload),
            default => $this->handleUnknownEvent($event, $payload, $messageId),
        };
    } catch (\Exception $e) {
        Log::error("Failed to process {$event} message {$messageId}: " . $e->getMessage());
        throw $e; // Rethrow to let the consumer handle retries
    }
}

private function processNewOrder(array $orderData): void
{
    // Process new order logic
}
bash
# Install the PHP Redis Extension
pecl install redis
# Add "extension=redis.so" to your php.ini
bash
php artisan vendor:publish --provider="WizardingCode\RedisStream\RedisStreamServiceProvider"
bash
php artisan redis-stream:orders