PHP code example of phpdot / rabbitmq
1. Go to this page and download the library: Download phpdot/rabbitmq library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
phpdot / rabbitmq example snippets
use PHPdot\RabbitMQ\RabbitMQConnection;
use PHPdot\RabbitMQ\Config\RabbitMQConfig;
use PHPdot\RabbitMQ\Message;
use PHPdot\RabbitMQ\Enum\TaskStatus;
$config = new RabbitMQConfig(
host: 'localhost',
exchanges: [
'tasks' => ['type' => 'direct', 'durable' => true],
],
queues: [
'tasks.process' => [
'bindings' => [['exchange' => 'tasks', 'routing_key' => 'task.new']],
'durable' => true,
],
],
);
$conn = new RabbitMQConnection($config);
// Publish
$conn->message('{"task":"send_email"}')
->publish('tasks', 'task.new');
// Consume
$conn->consume('tasks.process')
->execute(function (Message $msg): TaskStatus {
processTask(json_decode($msg->body(), true));
return TaskStatus::SUCCESS;
});
// Simple
$conn->message('{"order_id": 123}')
->publish('orders', 'order.created');
// Full-featured
$conn->message(json_encode($data))
->retry(5)
->priority(8)
->compress()
->header(['traceparent' => $traceHeader])
->header(['x-source' => 'api-gateway'])
->publish('orders', 'order.created');
$conn->consume('orders.process')
->prefetch(10)
->onRetry(function (Message $msg, int $count): void {
echo "Retry #{$count}: {$msg->messageId()}\n";
})
->onDead(function (Message $msg, string $reason): void {
echo "Dead: {$msg->messageId()} — {$reason}\n";
})
->execute(function (Message $msg): TaskStatus {
$data = json_decode($msg->body(), true);
if ($data === null) {
return TaskStatus::DEAD; // malformed, don't retry
}
try {
processOrder($data);
return TaskStatus::SUCCESS; // done
} catch (TemporaryException $e) {
return TaskStatus::RETRY; // try again
} catch (PermanentException $e) {
return TaskStatus::DEAD; // give up
}
});
'queues' => [
'orders.process' => [
'bindings' => [['exchange' => 'orders', 'routing_key' => 'order.created']],
'retry' => ['enable' => true, 'delay_ms' => 500],
'dead' => 'dead-letters',
'durable' => true,
],
],
use PHPdot\RabbitMQ\Enum\ReplayAction;
$result = $conn->replay('orders.dead')
->limit(100)
->execute(function (Message $msg): ReplayAction {
echo "[{$msg->messageId()}] {$msg->failedReason()}\n";
// Bad payload — clean up DB and discard permanently
if ($msg->failedReason() === 'Invalid payload') {
Order::where('message_id', $msg->messageId())->delete();
return ReplayAction::REMOVE;
}
// Timeout errors — bug is fixed, send it back
if (str_contains($msg->failedReason(), 'timeout')) {
return ReplayAction::REPLAY;
}
// Unknown — leave in DLQ for investigation
return ReplayAction::SKIP;
});
echo "Replayed: {$result->replayed}, Removed: {$result->removed}, Skipped: {$result->skipped}\n";
// Publish compressed
$conn->message($largePayload)
->compress()
->publish('data', 'import.batch');
// Consume — transparent decompression
$conn->consume('data.process')
->execute(function (Message $msg): TaskStatus {
$body = $msg->body(); // already decompressed
return TaskStatus::SUCCESS;
});
// Publish with trace
$conn->message($payload)
->header(['traceparent' => $tracelog->getTraceparent()->toHeader()])
->publish('orders', 'order.created');
// Consume with trace
$conn->consume('orders.process')
->execute(function (Message $msg): TaskStatus {
$traceparent = $msg->header('traceparent'); // '' if missing
// Reconnect trace in your framework layer
return TaskStatus::SUCCESS;
});
$config = new RabbitMQConfig(
host: 'rabbitmq.internal',
port: 5672,
username: 'app',
password: 'secret',
vhost: '/',
timeoutMs: 30000,
maxRetries: 3,
retryDelayMs: 1000,
exchanges: [
'orders' => ['type' => 'direct', 'durable' => true],
'notifications' => ['type' => 'fanout', 'durable' => true],
'dead' => ['type' => 'direct', 'durable' => true],
],
queues: [
'orders.process' => [
'bindings' => [
['exchange' => 'orders', 'routing_key' => 'order.created'],
['exchange' => 'orders', 'routing_key' => 'order.updated'],
],
'retry' => ['enable' => true, 'delay_ms' => 500],
'dead' => 'dead',
'durable' => true,
],
],
);
$msg->body(); // message content (decompressed)
$msg->messageId(); // UUIDv7 message ID
$msg->queue(); // queue name
$msg->header('key'); // header value or ''
$msg->headers(); // all headers as array
$msg->maxRetries(); // x-retries-max value
$msg->priority(); // 0-10
$msg->originalExchange(); // x-original-exchange
$msg->originalRoutingKey();// x-original-routing-key
$msg->failedReason(); // x-failed-reason (on dead letters)