PHP code example of tsterker / hopper

1. Go to this page and download the library: Download tsterker/hopper library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

tsterker / hopper example snippets


$connection = AMQPLazyConnection::create_connection(
    [
        ['host' => 'localhost', 'port' => 5672, 'user' => 'user', 'password' => 'pass', 'vhost' => '/'],
    ],
    [
        'keepalive' => true,
        'heartbeat' => 60,
        'connection_timeout' => 5,
        'read_write_timeout' => null,
    ]
);

use TSterker\Hopper\Hopper;
use TSterker\Hopper\Message;

$hopper = new Hopper($connection);

$fooExchange = $hopper->createExchange('foo');
$fooQueue = $hopper->createQueue('foo');

// Declare exchange & bind queue to it
$hopper->declareExchange($fooExchange);
$hopper->declareQueue($fooQueue);
$hopper->bind($fooExchange, $fooQueue);

// Subscribe to queue
$hopper->subscribe($fooQueue function (Message $msg, Hopper $hopper): void {
    echo "Message {$msg->getId()} received: " . json_encode($msg->getData()) . "\n";
    $msg->ack();
});

// Publish single message
$hopper->publish($fooExchange, Message::make(['foo' => 'bar']));

// Publish message batch
$hopper->publishBatch($fooExchange, [
    Message::make(['bar' => 'baz']),
    Message::make(['baz' => 'bazinga']),
]);

// Buffer messages & batch publish
$hopper->addBatchMessage($fooExchange, Message::make(['batch' => '1']));
$hopper->addBatchMessage($fooExchange, Message::make(['batch' => '1']));
$hopper->flushBatchPublishes();

// Consume messages for 5 seconds
$hopper->consume(5);

use TSterker\Hopper\Subscriber;
use TSterker\Hopper\Hopper;
use TSterker\Hopper\Message;

/** @var Hopper $hopper */

$subscriber = new Subscriber($hopper);

// Plumbing
$source = $hopper->createExchange('source');
$fooQueue = $hopper->createQueue('foo');
$barQueue = $hopper->createQueue('bar');
$hopper->declareExchange($source);
$hopper->declareQueue($fooQueue);
$hopper->declareQueue($barQueue);
$hopper->bind($source, $fooQueue);
$hopper->bind($source, $barQueue);

$subscriber
    ->subscribe($fooQueue, function (Message $msg) {
        echo "FOO Subscriber: {$msg->getId()}\n";
        $msg->ack();
    })
    ->subscribe($barQueue, function (Message $msg) {
        echo "BAR Subscriber: {$msg->getId()}\n";
        $msg->ack();
    })
    ->withIdleTimeout(1)  // Call idle handler after 1 second of not receiving messages
    ->useIdleHandler(function ($timeout) {
        echo "idle for more than $timeout seconds...\n";
    });

$subscriber->consume();

use TSterker\Hopper\Piper;
use TSterker\Hopper\Hopper;

/** @var Hopper $hopper */
class ExampleTransformer implements \TSterker\Hopper\Contracts\Transformer
{
    protected string $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }

    public function transformMessage(Message $msg): Message
    {
        return Message::make(['name' => $this->name]);
    }
}

// Plumbing
$source = $hopper->createExchange('source');
$fooQueue = $hopper->createQueue('foo');
$barQueue = $hopper->createQueue('bar');
$outQueue = $hopper->createQueue('out');
$hopper->declareExchange($source);
$hopper->declareQueue($fooQueue);
$hopper->declareQueue($barQueue);
$hopper->declareQueue($outQueue);
$hopper->bind($source, $fooQueue);
$hopper->bind($source, $barQueue);

$piper = new Piper(
    $hopper,
    10,  // buffer messages and then batch-publish
    2,   // "idle timeout" in seconds to flush buffer, even if not full
);

$fooX = new ExampleTransformer('foo');
$barX = new ExampleTransformer('bar');

$piper
    ->add($fooQueue, $outQueue, $fooX)
    ->add($barQueue, $outQueue, $barX)
    ->onFlush(function (int $messageCount): void {
        echo "Flushed $messageCount messages\n";
    });

$piper->consume();

$connection = AMQPLazyConnection::create_connection(
    [$host],
    [
        'keepalive' => true,
        'heartbeat' => env('RABBITMQ_HEARTBEAT', 60),
        'connection_timeout' => env('RABBITMQ_CONNECTION_TIMEOUT', 5),
        'read_write_timeout' => 0,  // <-- Setting this to 0 consistently reproduces the error
    ]
);