1. Go to this page and download the library: Download nimbly/syndicate library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
nimbly / syndicate example snippets
$publisher = new Sns(
client: new SnsClient(["region" => "us-west-2", "version" => "latest"])
);
$message = new Message(
topic: "arn:aws:sns:us-west-2:123456789012:orders",
payload: \json_encode($order)
);
$publisher->publish($message);
$consumer = new Sqs(
new SqsClient([
"region" => "us-west-2",
"version" => "latest"
])
);
$application = new Application(
consumer: $consumer,
router: new Router([
App\Consumer\Handlers\UsersHandler::class,
App\Consumer\Handlers\OrdersHandler::class
])
);
$publisher = new ValidateMessage(
new JsonSchemaValidator([
"users" => $schema
]),
new Sns(
new SnsClient($aws_config)
)
);
$publisher = new RedirectMessage(
new Sqs(
new SqsClient($aws_config)
),
"https://sqs.us-west-2.amazonaws.com/123456789012/deadletter"
);
/**
* Despite this message being intended for the "fruits" topic, it
* will actually be published to https://sqs.us-west-2.amazonaws.com/123456789012/deadletter.
*/
$publisher->publish(new Message("fruits", "banana"));
$publisher = new ValidatorFilter(
new JsonSchemaValidator(["fruits" => $fruits_schema]),
new Sns(new SnsClient($aws_config))
);
$publisher->publish($message);
$consumer = new Sqs(
new SqsClient($aws_config)
);
$router = new Router(
handlers: [
App\Consumer\Handlers\UsersHandler::class,
App\Consumer\Handlers\OrdersHandler::class
]
);
$router = new Router(
handlers: [
App\Consumer\Handlers\UsersHandler::class,
App\Consumer\Handlers\OrdersHandler::class
],
default: function(Message $message): Response {
// do something with message that could not be routed
if( $foo ){
return Response::deadletter;
}
return Response::ack;
}
);
public function onUserRegistered(Message $message, EmailService $email): Response
{
// Get the topic, queue name, or queue URL the message came from
$topic = $message->getTopic();
// JSON decode the message payload
$payload = \json_decode($message->getPayload());
// Get the pre-parsed payload, provided by the ParseJsonMessage middleware
$parsed_payload = $message->getParsedPayload();
// Get all headers of the message
$headers = $message->getHeaders();
// Get all attributes of the message
$attributes = $message->getAttributes();
}
namespace App\Consumer\Handlers;
use App\Services\EmailService;
use Nimbly\Syndicate\Router\Consume;
use Nimbly\Syndicate\Message;
class UsersHandler
{
public function __construct(
protected LoggerInterface $logger
)
{
}
#[Consume(
topic: "users",
payload: ["$.event" => "UserCreated"]
)]
public function onUserRegistered(Message $message, EmailService $email): Response
{
$this->logger->debug("Received UserCreated message.");
$payload = \json_decode($message->getPayload());
$receipt_id = $email->send(
$payload->user_name,
$payload->user_email,
"templates/registration.tpl"
);
return Response::ack;
}
}
public function onUserRegistered(Message $message, EmailService $email): Response
{
$payload = \json_decode($message->getPayload());
// There is something fundamentally wrong with this message.
// Let's push to the deadletter and investigate later.
if( \json_last_error() !== JSON_ERROR_NONE ){
return Response::deadletter;
}
$receipt_id = $email->send(
$payload->user_name,
$payload->user_email,
"templates/registration.tpl"
);
// Email send failed, let's try again later...
if( $receipt_id === null ){
return Response::nack;
}
// All good!
return Response::ack;
}
$application = new Application(
consumer: $consumer,
router: new Router([
App\Consumer\Handlers\UsersHandler::class,
App\Consumer\Handlers\OrdersHandler::class
]),
deadletter: new RedirectFilter(
$consumer,
"https://sqs.us-west-2.amazonaws.com/123456789012/deadletter"
),
container: $container,
logger: $logger,
middleware: [
new ValidateMessages(
new JsonSchemaValidator(["topic" => $schema])
)
],
signals: [SIGINT, SIGTERM, SIGHUP]
);
// Use Redis queue as our main consumer.
$redis = new Nimbly\Syndicate\Adapter\Redis(new Client);
// Redirect all messages to the "deadletter" queue in Redis.
$deadletter = new RedirectFilter($redis, "deadletter");