PHP code example of bschmitt / laravel-amqp

1. Go to this page and download the library: Download bschmitt/laravel-amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

bschmitt / laravel-amqp example snippets


use Bschmitt\Amqp\Facades\Amqp;

// Basic publish
Amqp::publish('routing-key', 'message');

// Publish with queue creation
Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']);

// Publish with message properties
Amqp::publish('routing-key', 'message', [
    'priority' => 10,
    'correlation_id' => 'unique-id',
    'reply_to' => 'reply-queue',
    'application_headers' => [
        'X-Custom-Header' => 'value'
    ]
]);

use Bschmitt\Amqp\Facades\Amqp;

// Consume and acknowledge (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    echo $message->body;
    $resolver->acknowledge($message);
    $resolver->stopWhenProcessed();
});

// Consume forever
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
}, ['persistent' => true]);

// Alternative: Using resolve() helper
$amqp = resolve('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
});

// Client side - Make RPC call (using dynamic call)
$amqp = app('Amqp');
$response = $amqp->rpc('rpc-queue', 'request-data', [], 30);

// Server side - Process and reply (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('rpc-queue', function ($message, $resolver) {
    $result = processRequest($message->body);
    $resolver->reply($message, $result);
    $resolver->acknowledge($message);
});

$amqp = app('Amqp');
$amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) {
    processMessage($message->body);
    $resolver->acknowledge($message);
});

namespace App\Messaging;

use Bschmitt\Amqp\Contracts\ConsumerInterface;
use Bschmitt\Amqp\Contracts\MessageHandlerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class ProcessOrderHandler implements MessageHandlerInterface
{
    public function handle(AMQPMessage $message, ConsumerInterface $resolver, $typed = null): void
    {
        $order = $typed !== null ? $typed->toPayload() : json_decode($message->body, true);
        // ... process $order ...
        $resolver->acknowledge($message);
    }
}

'connections' => [
    // ...
    'amqp' => [
        'driver' => 'amqp',
        'connection' => env('AMQP_ENV', 'production'), // key in config/amqp.php properties
        'queue' => env('AMQP_QUEUE', 'default'),
        'retry_after' => 90,
    ],
],

ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));

$app->configure('amqp');
$app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class);

// For Lumen 5.2+, enable facades
$app->withFacades(true, [
    'Bschmitt\Amqp\Facades\Amqp' => 'Amqp',
]);

return [
    'use' => env('AMQP_ENV', 'production'),

    'properties' => [
        'production' => [
            'host'                => env('AMQP_HOST', 'localhost'),
            'port'                => env('AMQP_PORT', 5672),
            'username'            => env('AMQP_USER', 'guest'),
            'password'            => env('AMQP_PASSWORD', 'guest'),
            'vhost'               => env('AMQP_VHOST', '/'),
            'exchange'            => env('AMQP_EXCHANGE', 'amq.topic'),
            'exchange_type'       => env('AMQP_EXCHANGE_TYPE', 'topic'),
            'consumer_tag'        => 'consumer',
            'ssl_options'         => [],
            'connect_options'     => [],
            'queue_properties'    => ['x-ha-policy' => ['S', 'all']],
            'exchange_properties' => [],
            'timeout'             => 0,
            
            // Management API (optional)
            'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'),
            'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'),
            'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'),
        ],
    ],
];

// Publishing
Amqp::publish('', 'message', [
    'exchange_type' => 'fanout',
    'exchange' => 'amq.fanout',
]);

// Consuming (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('', function ($message, $resolver) {
    echo $message->body;
    $resolver->acknowledge($message);
}, [
    'routing' => '',
    'exchange' => 'amq.fanout',
    'exchange_type' => 'fanout',
    'queue_force_declare' => true,
    'queue_exclusive' => true,
    'persistent' => true
]);

// Get Amqp instance
$amqp = app('Amqp');

// Purge queue
$amqp->queuePurge('my-queue', ['queue' => 'my-queue']);

// Delete queue
$amqp->queueDelete('my-queue', ['queue' => 'my-queue']);

// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');

// Get Amqp instance
$amqp = app('Amqp');

// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');

// List connections
$connections = $amqp->getConnections();

// Create policy
$amqp->createPolicy('my-policy', [
    'pattern' => '^my-queue$',
    'definition' => ['max-length' => 1000]
], '/');

use Bschmitt\Amqp\Support\DeadLetterTopology;
use Bschmitt\Amqp\Support\RetryPolicy;

$amqp = app('Amqp');

// RetryPolicy::exponential($maxAttempts, $baseDelayMs, $multiplier, $maxDelayMs)
$policy   = RetryPolicy::exponential(5, 1000, 2.0, 60000);
$topology = DeadLetterTopology::for('orders.process', $policy)
    ->on('app.events', 'topic')
    ->withRoutingKey('orders.process');

// Idempotently creates: orders.process, orders.process.dlq,
// and orders.process.retry.{1000,2000,4000,8000,16000} (capped at 60000).
$amqp->declareRetryTopology($topology);

$amqp->consumeWithRetry($topology, function ($message, $resolver) {
    processOrder(json_decode($message->body, true));
    $resolver->acknowledge($message);
});

use Bschmitt\Amqp\Support\RetryPolicy;

RetryPolicy::fixed(3, 1000);                       // 3 retries, 1s apart
RetryPolicy::exponential(5, 500, 2.0, 30000);      // 500ms doubling, capped at 30s
RetryPolicy::immediate(2);                         // 2 retries with zero delay
RetryPolicy::none();                               // failures go straight to the DLQ

use Bschmitt\Amqp\Support\RetryHandler;

$wrapped = $amqp->retryHandler($yourHandler, $topology, function ($level, $message, $context) {
    Log::log($level, $message, $context);
});

$amqp->consume('orders.process', $wrapped, $topology->toWorkProperties());

$amqp = app('Amqp');

// TTL + dead-letter exchange (works on stock RabbitMQ)
$amqp->publishLater('orders.reminder', json_encode(['orderId' => 42]), 60000, [
    'exchange' => 'shop.events',
]);

// rabbitmq-delayed-message-exchange plugin (exchange must be x-delayed-message)
$amqp->publishLater('orders.reminder', $body, 60000, [
    'exchange' => 'shop.delayed',
    'delay_strategy' => 'plugin',
]);

use Bschmitt\Amqp\Support\RetryPolicy;

$amqp->withPublishBackoff(RetryPolicy::exponential(3, 100, 2.0))->run(function () use ($amqp) {
    return $amqp->publish('orders.created', $payload);
});

use Bschmitt\Amqp\Support\TypedMessage;

class OrderCreated extends TypedMessage
{
    public $orderId;
    public $total;
    public $currency;

    public function __construct($orderId = null, $total = null, $currency = null)
    {
        $this->orderId = $orderId;
        $this->total = $total;
        $this->currency = $currency;
    }

    public static function routingKey()
    {
        return 'orders.created';
    }

    public static function exchange()
    {
        return 'shop.events';
    }
}

$amqp = app('Amqp');

// Publish — picks up routing key + exchange from the contract
$amqp->publishTyped(new OrderCreated('order-1', 19.99, 'USD'));

// Consume — callback receives ($typed, $message, $resolver)
$amqp->consumeTyped('orders.queue', OrderCreated::class, function ($order, $message, $resolver) {
    processOrder($order->orderId);
    $resolver->acknowledge($message);
});

// Delayed typed publish
$amqp->publishTypedLater(new OrderCreated('order-2', 9.99, 'USD'), 30000);

class OrderCreated extends TypedMessage
{
    // ...properties...

    public static function schema()
    {
        return [
            'type' => 'object',
            '  => ['type' => 'number', 'minimum' => 0],
                'currency' => ['type' => 'string', 'enum' => ['USD', 'EUR', 'GBP']],
            ],
        ];
    }
}

use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Support\ExchangeTopology;
use Bschmitt\Amqp\Support\QueueProfile;

$topology = ExchangeTopology::exchange('events', 'topic')
    ->bindQueue('orders.created', 'order.created')
    ->bindQueue('orders.shipped', 'order.shipped', QueueProfile::quorum());

Amqp::declareExchangeTopology($topology);

// Publish using properties for a specific queue in the topology
Amqp::publish('order.created', $payload, $topology->propertiesForQueue('orders.created'));

use Bschmitt\Amqp\Support\QueueProfile;

Amqp::publish('jobs', $payload, QueueProfile::quorumWithPriority(10)->mergeInto([
    'queue' => 'jobs',
    'routing' => 'jobs',
]));

use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Managers\ConnectionPool;

// Per-request resilient manager (reconnect + heartbeat staleness)
$resilient = Amqp::resilientConnection(['host' => 'rabbitmq'], [
    'max_reconnect_attempts' => 5,
    'heartbeat' => 30,
]);
$channel = $resilient->getChannel();

// Long-lived worker pool (persistent keys survive disconnectAll(false))
$pool = Amqp::connectionPool();
$manager = $pool->connection('worker', ['use' => 'production', 'resilient' => true], true);

use Bschmitt\Amqp\Support\CorrelationContext;

CorrelationContext::set('request-abc-123');

Amqp::publish('orders.created', $payload, [
    'propagate_correlation' => true,
    'propagate_trace' => true,
]);

Amqp::consumeWithLifecycle('orders.created', function ($message, $resolver) {
    // CorrelationContext::get() is populated when propagate_* flags are used
}, null, [
    'propagate_correlation' => true,
    'propagate_trace' => true,
]);

use Bschmitt\Amqp\Support\CallbackTracePropagator;

Amqp::setTracePropagator(new CallbackTracePropagator(
    function (array $carrier, $context) {
        // inject active span into $carrier
        return $carrier;
    },
    function (array $carrier) {
        // extract TraceContext from $carrier or return null
        return null;
    }
));

use Bschmitt\Amqp\Support\ConsumerLifecycle;

$lifecycle = (new ConsumerLifecycle())
    ->registerSignalHandlers()
    ->onStopping(function ($lifecycle) {
        // flush buffers, close DB connections, etc.
    });

Amqp::consumeWithLifecycle('jobs', $handler, $lifecycle);

use Bschmitt\Amqp\Facades\Amqp;

$saga = Amqp::saga('checkout')
    ->step('reserveStock', $reserveStock, $releaseStock)
    ->step('chargeCard',  $chargeCard,  $refundCard)
    ->step('shipOrder',   $shipOrder);

$result = $saga->execute(['orderId' => 42]);
if ($result->failed()) {
    Log::error('Saga failed', [
        'step' => $result->getFailedStep(),
        'compensated' => $result->getCompensatedSteps(),
        'error' => $result->getException()->getMessage(),
    ]);
}

Event::listen(\Bschmitt\Amqp\Events\MessageFailed::class, function ($event) {
    Log::warning('AMQP handler failed', ['error' => $event->exception->getMessage()]);
});

use Bschmitt\Amqp\Facades\Amqp;

Amqp::consumeWithMiddleware('orders', function ($message, $resolver) {
    // handle...
}, [
    function ($message, $next) {
        $start = microtime(true);
        $next($message);
        Log::info('handled', ['duration_ms' => (microtime(true) - $start) * 1000]);
    },
    // ...or a ConsumeMiddlewareInterface instance
]);

use Bschmitt\Amqp\Core\Amqp;

public function test_publishes_order_created()
{
    $fake = Amqp::fake();

    (new CreateOrder)->handle();

    $fake->assertPublished('orders.created');
    $fake->assertPublishedCount(1, 'orders.created');
    $fake->assertNotPublished('orders.shipped');
}

$async = Amqp::asyncPublisher(['exchange' => 'events'])
    ->onAck(function ($tag)  { /* metric: published */ })
    ->onNack(function ($tag) { /* metric: failed */ });

foreach ($messages as $m) {
    $async->publish('events.created', json_encode($m));
}

if (!$async->flush(30)) {
    Log::warning('Some publisher confirms timed out');
}

$async->close();

// Client
$result = Amqp::rpcClient(['exchange' => 'rpc'])->asJson()->timeout(10)
    ->call('users.lookup', ['id' => 42]);

if ($result->succeeded()) {
    $user = $result->body();
}

// Server
Amqp::rpcServer()->asJson()->serve('rpc.users', function ($request, $consumer) {
    return ['id' => $request['id'], 'name' => 'Ada'];
});

Amqp::publishInterop(
    'orders.created',
    ['orderId' => 99, 'total' => 12.50],
    'orders.created',
    'billing-service',
    ['exchange' => 'events'],
    '2.0'
);

Amqp::consumeInterop('events.orders', function ($interop, $raw, $resolver) {
    $payload = \Bschmitt\Amqp\Support\InteropEnvelope::decodePayload($interop);
    // $interop->messageType, $interop->sourceService, $interop->schemaVersion
});

// In-process counters (per worker / request)
$stats = Amqp::metrics()->snapshot();

// Broker-side queue depth + rates (Management API)
$metrics = Amqp::queueMetrics('orders', '/');
Log::info('queue depth', $metrics->toArray());

Amqp::consumeOptimized('jobs', $handler, ['exchange' => 'work']);

// Or explicitly:
Amqp::highPerformanceWorker(
    \Bschmitt\Amqp\Support\WorkerOptions::throughput(100)
)->run('jobs', $handler);

use Bschmitt\Amqp\Rpc\RpcService;
use Bschmitt\Amqp\Rpc\RpcRequest;
use Bschmitt\Amqp\Rpc\RpcResponse;

class UserService extends RpcService
{
    public static function queue(): string
    {
        return 'rpc.user-service';
    }

    public static function methods(): array
    {
        return [
            GetUserRequest::class    => 'getUser',
            CreateUserRequest::class => 'createUser',
        ];
    }
}

class GetUserRequest extends RpcRequest
{
    public $id;

    public function __construct($id = null) { $this->id = $id; }

    public static function responseClass()
    {
        return GetUserResponse::class;
    }
}

class GetUserResponse extends RpcResponse
{
    public $id;
    public $name;

    public function __construct($id = null, $name = null)
    {
        $this->id = $id;
        $this->name = $name;
    }
}

use Rpc; // facade alias auto-registered

$response = Rpc::call(
    UserService::class,
    GetUserRequest::make(['id' => 5])
);

echo $response->name; // GetUserResponse instance, hydrated for you

use Rpc;

class UserServiceHandler
{
    public function getUser(GetUserRequest $request): GetUserResponse
    {
        $user = User::findOrFail($request->id);
        return GetUserResponse::make([
            'id'   => $user->id,
            'name' => $user->name,
        ]);
    }

    public function createUser(CreateUserRequest $request): GetUserResponse
    {
        $user = User::create(['name' => $request->name]);
        return GetUserResponse::make(['id' => $user->id, 'name' => $user->name]);
    }
}

Rpc::register(UserService::class, UserServiceHandler::class)
   ->serve(UserService::class);

// Global default timeout
Rpc::defaultTimeout(10);

// Per-call timeout + extra publish properties
Rpc::call(UserService::class, GetUserRequest::make(['id' => 1]), 5, [
    'exchange' => 'rpc.svc',
]);

use Bschmitt\Amqp\Events\RpcCallCompleted;
use Bschmitt\Amqp\Events\RpcCallFailed;

Event::listen(RpcCallCompleted::class, fn ($e) => Log::info('rpc.ok', [
    'service' => $e->service,
    'request' => $e->request,
    'ms'      => $e->durationMs,
]));

$stats = Amqp::rpcMetrics()->snapshot();
// ['UserService::GetUserRequest' => ['count' => 42, 'p95_ms' => 12.5, 'error_rate' => 0.02, ...]]

use Bschmitt\Amqp\Facades\Rpc;

// Either: explicit registration
Rpc::services()->register('payments', PaymentsService::class);

// Or: opt-in auto-discovery (service exposes `public static function alias()`)
class PaymentsService extends RpcService {
    public static function queue(): string   { return 'rpc.payments'; }
    public static function methods(): array  { return [GetPayment::class => 'find']; }
    public static function alias(): ?string  { return 'payments'; }
}

Rpc::services()->autodiscover([PaymentsService::class]);

$response = Rpc::service('payments')
    ->timeout(5)
    ->call(GetPayment::make(['id' => 123]));

use Bschmitt\Amqp\Facades\Saga;

$result = Saga::make('checkout')
    ->step('reserve', fn($ctx) => $stock->reserve($ctx['orderId']))
        ->compensate(fn($ctx) => $stock->release($ctx['orderId']))
    ->step('charge',  fn($ctx) => $payments->charge($ctx['amount']))
        ->compensate(fn($ctx, $tx) => $payments->refund($tx))
    ->execute(['orderId' => 1, 'amount' => 49.99]);

if (!$result->succeeded()) {
    Log::error('Saga failed at ' . $result->getFailedStep(), [
        'compensated' => $result->getCompensatedSteps(),
    ]);
}

use Bschmitt\Amqp\Support\TypedMessage;

class OrderCreated extends TypedMessage
{
    public $orderId;
    public $total;

    public static function name(): string { return 'orders.created'; }
}

OrderCreated::dispatch(['orderId' => 'o-1', 'total' => 9.99]);

OrderCreated::dispatchLater(['orderId' => 'o-1'], 2_000); // 2s delay

use Bschmitt\Amqp\Facades\Amqp;

Amqp::deadLetters()->for('orders.dlq')->count();           // 17
Amqp::deadLetters()->for('orders.dlq')->peek(20);         // non-destructive sample
Amqp::deadLetters()->for('orders.dlq')->summarize(100);   // group by reason / error
Amqp::deadLetters()->for('orders.dlq')->messages(10);      // drain & inspect (destructive)
Amqp::deadLetters()->for('orders.dlq')->replayTo('orders', 50);
Amqp::deadLetters()->for('orders.dlq')->purge();

use Bschmitt\Amqp\Attributes\Retry;
use Bschmitt\Amqp\Support\RetryStrategy;
use Bschmitt\Amqp\Support\RetryPolicy;

class CreateOrderHandler
{
    #[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL, delayMs: 500)]
    public function handle($message): void { /* ... */ }
}

$policy = RetryPolicy::fromAttribute(CreateOrderHandler::class, 'handle');
$amqp->consumeWithRetry('orders', $handler, $policy);

$snapshot = Amqp::dashboard(['orders', 'orders.dlq'])
    ->deadLetters(['orders.dlq'])
    ->lagThresholds(1000, 60.0, 300)
    ->snapshot();
// process, queues (with lag / lag_seconds / lagging), dead_letters, rpc, lagging[], generated

CorrelationContext::inheritFromMessage($incoming);

Amqp::publish('orders.created', $body, [
    'propagate_correlation' => true,
    'message_id' => uniqid('msg_', true),
]);
// outbound has `correlation_id`, `x-correlation-id`, and `x-causation-id` set

use Bschmitt\Amqp\Support\CorrelationChain;

$chain = new CorrelationChain($amqp->messageStore());

$summary = $chain->summarize('corr_abc123');
// total, published, consumed, routings, first_at, last_at, duration_ms

$tree = $chain->tree('corr_abc123');         // nested ['entry' => ..., 'children' => [...]]
echo $chain->render($tree);                  // ASCII tree, perfect for logs

return [
    // ...
    'pulse_integration' => false,
];

use Bschmitt\Amqp\Contracts\TracePropagatorInterface;
use Bschmitt\Amqp\Support\OpenTelemetryTracePropagator;

// In a service provider:
$this->app->singleton(TracePropagatorInterface::class, function () {
    return new OpenTelemetryTracePropagator();
    // Or pass an explicit \OpenTelemetry\Context\Propagation\TextMapPropagatorInterface
});

use Bschmitt\Amqp\Support\InMemoryMessageStore;

$amqp->setMessageStore(new InMemoryMessageStore());

Amqp::publish('orders.created', '{}');

$entries = $amqp->messageStore()->all(['direction' => 'published']);

use Bschmitt\Amqp\Contracts\ShouldPublishToAmqpInterface;

class OrderCreated implements ShouldPublishToAmqpInterface
{
    public function __construct(public string $orderId) {}
}

// config/amqp.php
return [
    // ...
    'broadcast_laravel_events' => true,
];

event(new OrderCreated('o-1'));
// auto-published to RabbitMQ with routing key `order_created`

'probes' => [
    'enabled' => true,
    'prefix' => 'amqp/health',     // GET /amqp/health/live, /ready, /
    'middleware' => [],             // optional middleware (e.g. ['api'])
    'state_file' => storage_path('framework/amqp-health.json'),
    'heartbeat_age' => 60,          // seconds before liveness flips to 503
    'queues' => ['orders', 'orders.dlq'],
    'max_backlog' => 5000,
],

use Bschmitt\Amqp\Support\ConsumerLifecycle;
use Bschmitt\Amqp\Support\HealthState;

$lifecycle = (new ConsumerLifecycle())
    ->withHealth(HealthState::instance(storage_path('framework/amqp-health.json')))
    ->registerSignalHandlers();

Amqp::consumeWithLifecycle('orders', $handler, $lifecycle);

use Bschmitt\Amqp\Support\AutoscalingAdvisor;

$metrics = Amqp::queueMetrics('orders');

$advice = (new AutoscalingAdvisor())
    ->messagesPerConsumer(100)
    ->maxLagSeconds(15.0)
    ->minReplicas(1)
    ->maxReplicas(20)
    ->advise($metrics);

// $advice['desired_consumers'] => 4
// $advice['action']            => 'scale_up'
// $advice['reasons']           => ['depth 350 / 100 ...', 'lag 20s > 15s -> +1 ...']
// $advice['keda']              => KEDA RabbitMQ trigger spec

use Bschmitt\Amqp\Support\LaravelCloud;

if (LaravelCloud::isHosted()) {
    logger()->info('amqp hosted env', LaravelCloud::summary());
}

$props = LaravelCloud::parseDsn(env('AMQP_URL'));

// config/amqp.php
'regions' => [
    'enabled' => true,
    'connections' => ['production-us', 'production-eu', 'production-apac'],
    'primary' => null,           // null = match LARAVEL_CLOUD_REGION/AWS_REGION
    'cooldown_seconds' => 30,
],

use Bschmitt\Amqp\Support\MultiRegionConnection;

$resolver = app(MultiRegionConnection::class);

// Single attempt with locality preference
$connectionKey = $resolver->pick();              // 'production-us'

// Run a publish across regions until one succeeds
$resolver->withFailover(function ($region) {
    Amqp::publish('orders.created', $payload, ['use' => $region]);
});

// Fan-out to every region (e.g. announcements)
foreach ($resolver->each() as $region) {
    Amqp::publish('events.maintenance', $payload, ['use' => $region]);
}
bash
php artisan amqp:publish order.created --body='{"id":42}' --exchange=orders --priority=5
php artisan amqp:publish order.created --file=./payload.json --headers='{"X-Source":"cli"}'
php artisan amqp:publish order.created --body='{"id":42}' --delay-ms=5000 --exchange=orders
bash
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
bash
php artisan queue:work amqp --queue=default
bash
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
bash
mkdir config
cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.php
bash
php artisan amqp:dlq inspect orders.dlq
php artisan amqp:dlq summary orders.dlq --limit=200 --json
php artisan amqp:dlq replay  orders.dlq --target=orders --limit=50
php artisan amqp:dlq purge   orders.dlq --force
bash
php artisan amqp:trace corr_abc123
php artisan amqp:trace corr_abc123 --summary
php artisan amqp:trace corr_abc123 --json --limit=50
bash
# Readiness (default)
php artisan amqp:health
php artisan amqp:health --queue=orders --backlog=1000

# Liveness
php artisan amqp:health --probe=live --heartbeat-age=30

# Combined snapshot
php artisan amqp:health --all --state-file=/var/run/amqp-health.json