PHP code example of jardisadapter / messaging

1. Go to this page and download the library: Download jardisadapter/messaging library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

jardisadapter / messaging example snippets


use JardisAdapter\Messaging\MessagePublisher;
use JardisAdapter\Messaging\MessageConsumer;
use JardisAdapter\Messaging\Factory\ConnectionFactory;
use JardisAdapter\Messaging\Factory\PublisherFactory;
use JardisAdapter\Messaging\Factory\ConsumerFactory;
use JardisAdapter\Messaging\Handler\CallbackHandler;

$connFactory = new ConnectionFactory();
$pubFactory  = new PublisherFactory();
$conFactory  = new ConsumerFactory();

// Create and share a Redis connection
$redisConn = $connFactory->redis('localhost', 6379);

// Publish
$publisher = new MessagePublisher($pubFactory->redis($redisConn));
$publisher->publish('orders', ['order_id' => 42, 'total' => 99.99]);

// Consume
$consumer = new MessageConsumer($conFactory->redis($redisConn));
$consumer->consume('orders', new CallbackHandler(function (string|array $message, array $metadata): bool {
    // $message = ['order_id' => 42, 'total' => 99.99]  (auto-deserialized)
    return true; // true = ACK, false = reject/requeue
}));

$primary = $connFactory->redis('redis-primary');
$secondary = $connFactory->redis('redis-secondary');

$publisher = new MessagePublisher(
    $pubFactory->redis($primary),     // tried first
    $pubFactory->redis($secondary),   // fallback if primary fails
);
$publisher->publish('orders', ['order_id' => 42]);

$redisConn = $connFactory->redis('localhost', 6379);

// Pub/Sub (default)
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Streams
$publisher = new MessagePublisher($pubFactory->redis($redisConn, useStreams: true));

// Consumer groups (Streams only)
$consumer = new MessageConsumer($conFactory->redis($redisConn, useStreams: true));
$consumer->consume('orders', $handler, [
    'group'    => 'order-processors',  // auto-created if missing
    'consumer' => 'worker-1',
    'block'    => 5000,
    'count'    => 1,
]);

// Producer
$kafkaConn = $connFactory->kafka('kafka:9092');
$publisher = new MessagePublisher($pubFactory->kafka($kafkaConn));
$publisher->publish('invoices', ['invoice_id' => 7], ['key' => 'partition-key']);

// Consumer (groupId is part of connection)
$kafkaConsumerConn = $connFactory->kafkaConsumer('kafka:9092', 'invoice-processor');
$consumer = new MessageConsumer($conFactory->kafka($kafkaConsumerConn));
$consumer->consume('invoices', $handler);

$rabbitConn = $connFactory->rabbitMq('localhost', 5672, 'guest', 'guest');

$publisher = new MessagePublisher($pubFactory->rabbitMq($rabbitConn));
$publisher->publish('order.created', ['orderId' => 42]);

$consumer = new MessageConsumer($conFactory->rabbitMq($rabbitConn, 'order-queue'));
$consumer->consume('order.created', $handler);

use JardisAdapter\Messaging\Config\DatabaseTransportOptions;

$dbConn = $connFactory->database('mysql:host=localhost;dbname=app', 'user', 'pass');

$options = new DatabaseTransportOptions(
    table: 'domain_events',
    deleteAfterProcessing: false,  // soft delete (default)
    pollingIntervalMs: 1000,
    batchSize: 10,
    maxAttempts: 3,
);

$publisher = new MessagePublisher($pubFactory->database($dbConn, $options));
$consumer = new MessageConsumer($conFactory->database($dbConn, $options));

// Point-to-Point (default): one consumer per event
$consumer->consume('OrderCreated', $handler);

// Fan-Out: multiple consumer groups process the same event
$consumer->consume('InvoiceCreated', $handler, ['group' => 'email-service']);
$consumer->consume('InvoiceCreated', $handler, ['group' => 'pdf-service']);

use JardisAdapter\Messaging\Transport\InMemoryTransport;

$transport = new InMemoryTransport();

$publisher = new MessagePublisher($pubFactory->inMemory($transport));
$consumer  = new MessageConsumer($conFactory->inMemory($transport));

$publisher->publish('test', ['id' => 1]);
$transport->getMessageCount('test');  // 1

$consumer->consume('test', $handler, ['limit' => 5]);

// Existing Redis instance
$redisConn = $connFactory->fromRedis($existingRedis, manageLifecycle: false);
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Existing PDO instance
$dbConn = $connFactory->fromPdo($existingPdo, manageLifecycle: false);

// Existing AMQP connection
$rabbitConn = $connFactory->fromAmqp($amqpConnection, exchangeName: 'custom');

// Existing Kafka producer/consumer
$kafkaProducerConn = $connFactory->fromKafkaProducer($producer, flushOnDisconnect: true);
$kafkaConsumerConn = $connFactory->fromKafkaConsumer($consumer);

use JardisAdapter\Messaging\MessagingService;

$messaging = new MessagingService(
    publisherFactory: fn() => new MessagePublisher($pubFactory->redis($redisConn)),
    consumerFactory:  fn() => new MessageConsumer($conFactory->redis($redisConn)),
);

$messaging->publish('notifications', ['type' => 'email', 'to' => '[email protected]']);
$messaging->consume('notifications', $handler);

$messaging->getPublisher();  // MessagePublisherInterface
$messaging->getConsumer();   // MessageConsumerInterface