1. Go to this page and download the library: Download bschmitt/laravel-amqp library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
use Bschmitt\Amqp\Support\DeadLetterTopology;
use Bschmitt\Amqp\Support\RetryPolicy;
$amqp = app('Amqp');
// RetryPolicy::exponential($maxAttempts, $baseDelayMs, $multiplier, $maxDelayMs)
$policy = RetryPolicy::exponential(5, 1000, 2.0, 60000);
$topology = DeadLetterTopology::for('orders.process', $policy)
->on('app.events', 'topic')
->withRoutingKey('orders.process');
// Idempotently creates: orders.process, orders.process.dlq,
// and orders.process.retry.{1000,2000,4000,8000,16000} (capped at 60000).
$amqp->declareRetryTopology($topology);
$amqp->consumeWithRetry($topology, function ($message, $resolver) {
processOrder(json_decode($message->body, true));
$resolver->acknowledge($message);
});
use Bschmitt\Amqp\Support\RetryPolicy;
RetryPolicy::fixed(3, 1000); // 3 retries, 1s apart
RetryPolicy::exponential(5, 500, 2.0, 30000); // 500ms doubling, capped at 30s
RetryPolicy::immediate(2); // 2 retries with zero delay
RetryPolicy::none(); // failures go straight to the DLQ
use Bschmitt\Amqp\Support\RetryHandler;
$wrapped = $amqp->retryHandler($yourHandler, $topology, function ($level, $message, $context) {
Log::log($level, $message, $context);
});
$amqp->consume('orders.process', $wrapped, $topology->toWorkProperties());
use Bschmitt\Amqp\Support\RetryPolicy;
$amqp->withPublishBackoff(RetryPolicy::exponential(3, 100, 2.0))->run(function () use ($amqp) {
return $amqp->publish('orders.created', $payload);
});
use Bschmitt\Amqp\Support\TypedMessage;
class OrderCreated extends TypedMessage
{
public $orderId;
public $total;
public $currency;
public function __construct($orderId = null, $total = null, $currency = null)
{
$this->orderId = $orderId;
$this->total = $total;
$this->currency = $currency;
}
public static function routingKey()
{
return 'orders.created';
}
public static function exchange()
{
return 'shop.events';
}
}
use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Support\ExchangeTopology;
use Bschmitt\Amqp\Support\QueueProfile;
$topology = ExchangeTopology::exchange('events', 'topic')
->bindQueue('orders.created', 'order.created')
->bindQueue('orders.shipped', 'order.shipped', QueueProfile::quorum());
Amqp::declareExchangeTopology($topology);
// Publish using properties for a specific queue in the topology
Amqp::publish('order.created', $payload, $topology->propertiesForQueue('orders.created'));
use Bschmitt\Amqp\Support\CorrelationContext;
CorrelationContext::set('request-abc-123');
Amqp::publish('orders.created', $payload, [
'propagate_correlation' => true,
'propagate_trace' => true,
]);
Amqp::consumeWithLifecycle('orders.created', function ($message, $resolver) {
// CorrelationContext::get() is populated when propagate_* flags are used
}, null, [
'propagate_correlation' => true,
'propagate_trace' => true,
]);
use Bschmitt\Amqp\Support\CallbackTracePropagator;
Amqp::setTracePropagator(new CallbackTracePropagator(
function (array $carrier, $context) {
// inject active span into $carrier
return $carrier;
},
function (array $carrier) {
// extract TraceContext from $carrier or return null
return null;
}
));
use Bschmitt\Amqp\Support\ConsumerLifecycle;
$lifecycle = (new ConsumerLifecycle())
->registerSignalHandlers()
->onStopping(function ($lifecycle) {
// flush buffers, close DB connections, etc.
});
Amqp::consumeWithLifecycle('jobs', $handler, $lifecycle);
use Bschmitt\Amqp\Facades\Amqp;
Amqp::consumeWithMiddleware('orders', function ($message, $resolver) {
// handle...
}, [
function ($message, $next) {
$start = microtime(true);
$next($message);
Log::info('handled', ['duration_ms' => (microtime(true) - $start) * 1000]);
},
// ...or a ConsumeMiddlewareInterface instance
]);
use Bschmitt\Amqp\Core\Amqp;
public function test_publishes_order_created()
{
$fake = Amqp::fake();
(new CreateOrder)->handle();
$fake->assertPublished('orders.created');
$fake->assertPublishedCount(1, 'orders.created');
$fake->assertNotPublished('orders.shipped');
}
use Bschmitt\Amqp\Rpc\RpcService;
use Bschmitt\Amqp\Rpc\RpcRequest;
use Bschmitt\Amqp\Rpc\RpcResponse;
class UserService extends RpcService
{
public static function queue(): string
{
return 'rpc.user-service';
}
public static function methods(): array
{
return [
GetUserRequest::class => 'getUser',
CreateUserRequest::class => 'createUser',
];
}
}
class GetUserRequest extends RpcRequest
{
public $id;
public function __construct($id = null) { $this->id = $id; }
public static function responseClass()
{
return GetUserResponse::class;
}
}
class GetUserResponse extends RpcResponse
{
public $id;
public $name;
public function __construct($id = null, $name = null)
{
$this->id = $id;
$this->name = $name;
}
}
use Rpc; // facade alias auto-registered
$response = Rpc::call(
UserService::class,
GetUserRequest::make(['id' => 5])
);
echo $response->name; // GetUserResponse instance, hydrated for you
use Rpc;
class UserServiceHandler
{
public function getUser(GetUserRequest $request): GetUserResponse
{
$user = User::findOrFail($request->id);
return GetUserResponse::make([
'id' => $user->id,
'name' => $user->name,
]);
}
public function createUser(CreateUserRequest $request): GetUserResponse
{
$user = User::create(['name' => $request->name]);
return GetUserResponse::make(['id' => $user->id, 'name' => $user->name]);
}
}
Rpc::register(UserService::class, UserServiceHandler::class)
->serve(UserService::class);
use Bschmitt\Amqp\Contracts\TracePropagatorInterface;
use Bschmitt\Amqp\Support\OpenTelemetryTracePropagator;
// In a service provider:
$this->app->singleton(TracePropagatorInterface::class, function () {
return new OpenTelemetryTracePropagator();
// Or pass an explicit \OpenTelemetry\Context\Propagation\TextMapPropagatorInterface
});
use Bschmitt\Amqp\Support\ConsumerLifecycle;
use Bschmitt\Amqp\Support\HealthState;
$lifecycle = (new ConsumerLifecycle())
->withHealth(HealthState::instance(storage_path('framework/amqp-health.json')))
->registerSignalHandlers();
Amqp::consumeWithLifecycle('orders', $handler, $lifecycle);
use Bschmitt\Amqp\Support\MultiRegionConnection;
$resolver = app(MultiRegionConnection::class);
// Single attempt with locality preference
$connectionKey = $resolver->pick(); // 'production-us'
// Run a publish across regions until one succeeds
$resolver->withFailover(function ($region) {
Amqp::publish('orders.created', $payload, ['use' => $region]);
});
// Fan-out to every region (e.g. announcements)
foreach ($resolver->each() as $region) {
Amqp::publish('events.maintenance', $payload, ['use' => $region]);
}