PHP code example of andreagroferreira / redis-stream
1. Go to this page and download the library: Download andreagroferreira/redis-stream library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
andreagroferreira / redis-stream example snippets
use WizardingCode\RedisStream\Exceptions\ConnectionException;
use WizardingCode\RedisStream\Exceptions\PublishException;
try {
$producer->publish('user.created', $userData);
} catch (ConnectionException $e) {
// Handle connection issues
Log::error("Redis connection error: " . $e->getMessage());
// Maybe retry or queue for later
} catch (PublishException $e) {
// Handle publishing issues
Log::error("Failed to publish message: " . $e->getMessage());
} catch (Exception $e) {
// Handle other errors
Log::error("Unexpected error: " . $e->getMessage());
}
// Publishing messages
$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class);
$messageId = $producer->publish('user.created', [
'user_id' => 1234,
'name' => 'John Doe',
'email' => '[email protected]'
]);
// Consuming messages (in a console command or job)
$consumer = app(WizardingCode\RedisStream\RedisStreamConsumer::class);
$consumer->consume(function($data, $messageId) {
$event = $data['event'];
$payload = $data['payload'];
// Process the message
match($event) {
'user.created' => $this->processUserCreation($payload),
'user.updated' => $this->processUserUpdate($payload),
default => $this->processUnknownEvent($event, $payload)
};
// Message is auto-acknowledged if no exception is thrown
});
namespace App\Handlers;
class MyStreamHandler
{
public function handle(array $data, string $messageId): void
{
$event = $data['event'];
$payload = $data['payload'];
// Your custom handling logic
logger()->info("Processing event: {$event}");
// Process based on event type
match($event) {
'order.created' => $this->processOrder($payload),
'payment.completed' => $this->processPayment($payload),
default => $this->handleUnknown($event, $payload)
};
}
protected function processOrder(array $data): void
{
// Process order logic
}
protected function processPayment(array $data): void
{
// Process payment logic
}
protected function handleUnknown(string $event, array $data): void
{
logger()->warning("Unknown event type: {$event}");
}
}
$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class);
// Trim to approximately 10,000 items (fast)
$deleted = $producer->trim(10000);
// Trim to exactly 10,000 items (slower)
$deleted = $producer->trim(10000, true);
// Get a specific producer for a named stream
$ordersProducer = app('redis_stream.producer.stream_orders');
$ordersProducer->publish('order.created', ['order_id' => 12345]);
protected function handleMessage(array $data, string $messageId): void
{
$event = $data['event'] ?? 'unknown';
$payload = $data['payload'] ?? [];
Log::info("Processing {$event} message {$messageId}");
try {
// Handle different event types
match ($event) {
'order.created' => $this->processNewOrder($payload),
'order.updated' => $this->processOrderUpdate($payload),
'order.cancelled' => $this->processOrderCancellation($payload),
default => $this->handleUnknownEvent($event, $payload, $messageId),
};
} catch (\Exception $e) {
Log::error("Failed to process {$event} message {$messageId}: " . $e->getMessage());
throw $e; // Rethrow to let the consumer handle retries
}
}
private function processNewOrder(array $orderData): void
{
// Process new order logic
}
bash
# Install the PHP Redis Extension
pecl install redis
# Add "extension=redis.so" to your php.ini