PHP code example of innmind / amqp

1. Go to this page and download the library: Download innmind/amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

innmind / amqp example snippets


use Innmind\AMQP\{
    Factory,
    Command\DeclareExchange,
    Command\DeclareQueue,
    Command\Bind,
    Command\Publish,
    Model\Basic\Message,
    Model\Exchange\Type,
};
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\OperatingSystem\Factory as OSFactory;
use Innmind\Url\Url;
use Innmind\Immutable\Str;

$os = OSFactory::build();
$client = Factory::of($os)
    ->make(
        Transport::tcp(),
        Url::of('amqp://guest:guest@localhost:5672/'),
        new ElapsedPeriod(1000), // timeout
    )
    ->with(DeclareExchange::of('crawler', Type::direct))
    ->with(DeclareQueue::of('parser'))
    ->with(Bind::of('crawler', 'parser'))
    ->with(Publish::one(Message::of(Str::of('https://github.com')))->to('crawler'))
    ->run(null)
    ->match(
        static fn() => null, // success
        static fn($failure) => throw new \RuntimeException($failure::class),
    );

use Innminq\AMQP\{
    Command\Get,
    Command\Consume,
    Consumer\Continuation,
    Model\Basic\Message,
};

$state = $client
    ->with(Get::of('parser')->handle(static function($state, Message $message, Continuation $continuation) {
        $state = $message->body()->toString();

        return $continuation->ack($state);
    }))
    ->run(null) // <- this argument will passed as the state to the handler above
    ->match(
        static fn($state) => $state,
        static fn($failure) => throw new \RuntimeException($failure::class),
    );
echo $state; // will print "http://github.com/"
// or
$client
    ->with(Consume::of('crawler')->handle(static function($state, Message $message, Continuation $continuation) {
        doStuff($message);

        return $continuation->reject($state); // to reject the message
        return $continuation->requeue($state); // put the message back in the queue so it can be redelivered
        return $continuation->cancel($state); // instruct to stop receiving messages (current will be acknowledged first)
    }))
    ->run(null)
    ->match(
        static fn() => null, // in this case only reachable when you cancel the consumer
        static fn($failure) => throw new \RuntimeException($failure::class),
    );

make benchmark
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.48978996276855
Consuming 4000:
php benchmark/consumer.php
Pid: 701, Count: 4000, Time: 2.3580

Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.15483689308167
Consuming 4000:
php benchmark/consumer.php
Pid: 46862, Count: 4000, Time: 0.2366