1. Go to this page and download the library: Download innmind/amqp library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
innmind / amqp example snippets
use Innmind\AMQP\{
Factory,
Command\DeclareExchange,
Command\DeclareQueue,
Command\Bind,
Command\Publish,
Model\Basic\Message,
Model\Exchange\Type,
};
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\OperatingSystem\Factory as OSFactory;
use Innmind\Url\Url;
use Innmind\Immutable\Str;
$os = OSFactory::build();
$client = Factory::of($os)
->make(
Transport::tcp(),
Url::of('amqp://guest:guest@localhost:5672/'),
new ElapsedPeriod(1000), // timeout
)
->with(DeclareExchange::of('crawler', Type::direct))
->with(DeclareQueue::of('parser'))
->with(Bind::of('crawler', 'parser'))
->with(Publish::one(Message::of(Str::of('https://github.com')))->to('crawler'))
->run(null)
->match(
static fn() => null, // success
static fn($failure) => throw new \RuntimeException($failure::class),
);
use Innminq\AMQP\{
Command\Get,
Command\Consume,
Consumer\Continuation,
Model\Basic\Message,
};
$state = $client
->with(Get::of('parser')->handle(static function($state, Message $message, Continuation $continuation) {
$state = $message->body()->toString();
return $continuation->ack($state);
}))
->run(null) // <- this argument will passed as the state to the handler above
->match(
static fn($state) => $state,
static fn($failure) => throw new \RuntimeException($failure::class),
);
echo $state; // will print "http://github.com/"
// or
$client
->with(Consume::of('crawler')->handle(static function($state, Message $message, Continuation $continuation) {
doStuff($message);
return $continuation->reject($state); // to reject the message
return $continuation->requeue($state); // put the message back in the queue so it can be redelivered
return $continuation->cancel($state); // instruct to stop receiving messages (current will be acknowledged first)
}))
->run(null)
->match(
static fn() => null, // in this case only reachable when you cancel the consumer
static fn($failure) => throw new \RuntimeException($failure::class),
);
make benchmark
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.48978996276855
Consuming 4000:
php benchmark/consumer.php
Pid: 701, Count: 4000, Time: 2.3580