PHP code example of tsterker / hopper
1. Go to this page and download the library: Download tsterker/hopper library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
tsterker / hopper example snippets
$connection = AMQPLazyConnection::create_connection(
[
['host' => 'localhost', 'port' => 5672, 'user' => 'user', 'password' => 'pass', 'vhost' => '/'],
],
[
'keepalive' => true,
'heartbeat' => 60,
'connection_timeout' => 5,
'read_write_timeout' => null,
]
);
use TSterker\Hopper\Hopper;
use TSterker\Hopper\Message;
$hopper = new Hopper($connection);
$fooExchange = $hopper->createExchange('foo');
$fooQueue = $hopper->createQueue('foo');
// Declare exchange & bind queue to it
$hopper->declareExchange($fooExchange);
$hopper->declareQueue($fooQueue);
$hopper->bind($fooExchange, $fooQueue);
// Subscribe to queue
$hopper->subscribe($fooQueue function (Message $msg, Hopper $hopper): void {
echo "Message {$msg->getId()} received: " . json_encode($msg->getData()) . "\n";
$msg->ack();
});
// Publish single message
$hopper->publish($fooExchange, Message::make(['foo' => 'bar']));
// Publish message batch
$hopper->publishBatch($fooExchange, [
Message::make(['bar' => 'baz']),
Message::make(['baz' => 'bazinga']),
]);
// Buffer messages & batch publish
$hopper->addBatchMessage($fooExchange, Message::make(['batch' => '1']));
$hopper->addBatchMessage($fooExchange, Message::make(['batch' => '1']));
$hopper->flushBatchPublishes();
// Consume messages for 5 seconds
$hopper->consume(5);
use TSterker\Hopper\Subscriber;
use TSterker\Hopper\Hopper;
use TSterker\Hopper\Message;
/** @var Hopper $hopper */
$subscriber = new Subscriber($hopper);
// Plumbing
$source = $hopper->createExchange('source');
$fooQueue = $hopper->createQueue('foo');
$barQueue = $hopper->createQueue('bar');
$hopper->declareExchange($source);
$hopper->declareQueue($fooQueue);
$hopper->declareQueue($barQueue);
$hopper->bind($source, $fooQueue);
$hopper->bind($source, $barQueue);
$subscriber
->subscribe($fooQueue, function (Message $msg) {
echo "FOO Subscriber: {$msg->getId()}\n";
$msg->ack();
})
->subscribe($barQueue, function (Message $msg) {
echo "BAR Subscriber: {$msg->getId()}\n";
$msg->ack();
})
->withIdleTimeout(1) // Call idle handler after 1 second of not receiving messages
->useIdleHandler(function ($timeout) {
echo "idle for more than $timeout seconds...\n";
});
$subscriber->consume();
use TSterker\Hopper\Piper;
use TSterker\Hopper\Hopper;
/** @var Hopper $hopper */
class ExampleTransformer implements \TSterker\Hopper\Contracts\Transformer
{
protected string $name;
public function __construct(string $name)
{
$this->name = $name;
}
public function transformMessage(Message $msg): Message
{
return Message::make(['name' => $this->name]);
}
}
// Plumbing
$source = $hopper->createExchange('source');
$fooQueue = $hopper->createQueue('foo');
$barQueue = $hopper->createQueue('bar');
$outQueue = $hopper->createQueue('out');
$hopper->declareExchange($source);
$hopper->declareQueue($fooQueue);
$hopper->declareQueue($barQueue);
$hopper->declareQueue($outQueue);
$hopper->bind($source, $fooQueue);
$hopper->bind($source, $barQueue);
$piper = new Piper(
$hopper,
10, // buffer messages and then batch-publish
2, // "idle timeout" in seconds to flush buffer, even if not full
);
$fooX = new ExampleTransformer('foo');
$barX = new ExampleTransformer('bar');
$piper
->add($fooQueue, $outQueue, $fooX)
->add($barQueue, $outQueue, $barX)
->onFlush(function (int $messageCount): void {
echo "Flushed $messageCount messages\n";
});
$piper->consume();
$connection = AMQPLazyConnection::create_connection(
[$host],
[
'keepalive' => true,
'heartbeat' => env('RABBITMQ_HEARTBEAT', 60),
'connection_timeout' => env('RABBITMQ_CONNECTION_TIMEOUT', 5),
'read_write_timeout' => 0, // <-- Setting this to 0 consistently reproduces the error
]
);