PHP code example of nimbly / syndicate

1. Go to this page and download the library: Download nimbly/syndicate library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

nimbly / syndicate example snippets


$publisher = new Sns(
	client: new SnsClient(["region" => "us-west-2", "version" => "latest"])
);

$message = new Message(
	topic: "arn:aws:sns:us-west-2:123456789012:orders",
	payload: \json_encode($order)
);

$publisher->publish($message);

$consumer = new Sqs(
	new SqsClient([
		"region" => "us-west-2",
		"version" => "latest"
	])
);

$application = new Application(
	consumer: $consumer,
	router: new Router([
		App\Consumer\Handlers\UsersHandler::class,
		App\Consumer\Handlers\OrdersHandler::class
	])
);

$application->listen(
	location: "https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue"
);

$publisher = new Sns(
	new SnsClient($aws_config)
);

$receipt = $publisher->publish($message);

$message = new Message(topic: "users", payload: \json_encode($user));

$message = new Message(
	topic: "users",
	payload: \json_encode($user),
	headers: ["Header1" => "Value1"],
	attributes: ["id" => (string) Uuid::uuid4(), "priority" => "high"]
);

$publisher->publish($message);

$publisher = new ValidateMessage(
	new JsonSchemaValidator([
		"users" => $schema
	]),
	new Sns(
		new SnsClient($aws_config)
	)
);

$publisher = new RedirectMessage(
	new Sqs(
		new SqsClient($aws_config)
	),
	"https://sqs.us-west-2.amazonaws.com/123456789012/deadletter"
);

/**
 * Despite this message being intended for the "fruits" topic, it
 * will actually be published to https://sqs.us-west-2.amazonaws.com/123456789012/deadletter.
 */
$publisher->publish(new Message("fruits", "banana"));

$publisher = new ValidatorFilter(
	new JsonSchemaValidator(["fruits" => $fruits_schema]),
	new Sns(new SnsClient($aws_config))
);

$publisher->publish($message);

$consumer = new Sqs(
	new SqsClient($aws_config)
);

$router = new Router(
	handlers: [
		App\Consumer\Handlers\UsersHandler::class,
		App\Consumer\Handlers\OrdersHandler::class
	]
);

$router = new Router(
	handlers: [
		App\Consumer\Handlers\UsersHandler::class,
		App\Consumer\Handlers\OrdersHandler::class
	],
	default: function(Message $message): Response {
		// do something with message that could not be routed

		if( $foo ){
			return Response::deadletter;
		}

		return Response::ack;
	}
);

public function onUserRegistered(Message $message, EmailService $email): Response
{
	// Get the topic, queue name, or queue URL the message came from
	$topic = $message->getTopic();

	// JSON decode the message payload
	$payload = \json_decode($message->getPayload());

	// Get the pre-parsed payload, provided by the ParseJsonMessage middleware
	$parsed_payload = $message->getParsedPayload();

	// Get all headers of the message
	$headers = $message->getHeaders();

	// Get all attributes of the message
	$attributes = $message->getAttributes();
}

namespace App\Consumer\Handlers;

use App\Services\EmailService;
use Nimbly\Syndicate\Router\Consume;
use Nimbly\Syndicate\Message;

class UsersHandler
{
	public function __construct(
		protected LoggerInterface $logger
	)
	{
	}

	#[Consume(
		topic: "users",
		payload: ["$.event" => "UserCreated"]
	)]
	public function onUserRegistered(Message $message, EmailService $email): Response
	{
		$this->logger->debug("Received UserCreated message.");

		$payload = \json_decode($message->getPayload());

		$receipt_id = $email->send(
			$payload->user_name,
			$payload->user_email,
			"templates/registration.tpl"
		);

		return Response::ack;
	}
}

#[Consume(
	topic: "users",
	payload: ["$.type" => "UserCreated", "$.body.role" => ["user", "admin"]]
)]

#[Consume(
	topic: ["users/*"],
	payload: ["$.type" => ["User*", "Admin*"], "$.body.role" => ["user", "admin"]]
)]

#[Consume(
	headers: ["Origin" => ["*/Syndicate", "Deadletter/*"]]
)]

public function onUserRegistered(Message $message, EmailService $email): Response
{
	$payload = \json_decode($message->getPayload());

	// There is something fundamentally wrong with this message.
	// Let's push to the deadletter and investigate later.
	if( \json_last_error() !== JSON_ERROR_NONE ){
		return Response::deadletter;
	}

	$receipt_id = $email->send(
		$payload->user_name,
		$payload->user_email,
		"templates/registration.tpl"
	);

	// Email send failed, let's try again later...
	if( $receipt_id === null ){
		return Response::nack;
	}

	// All good!
	return Response::ack;
}

$application = new Application(
	consumer: $consumer,
	router: new Router([
		App\Consumer\Handlers\UsersHandler::class,
		App\Consumer\Handlers\OrdersHandler::class
	]),
	deadletter: new RedirectFilter(
		$consumer,
		"https://sqs.us-west-2.amazonaws.com/123456789012/deadletter"
	),
	container: $container,
	logger: $logger,
	middleware: [
		new ValidateMessages(
			new JsonSchemaValidator(["topic" => $schema])
		)
	],
	signals: [SIGINT, SIGTERM, SIGHUP]
);

// Use Redis queue as our main consumer.
$redis = new Nimbly\Syndicate\Adapter\Redis(new Client);

// Redirect all messages to the "deadletter" queue in Redis.
$deadletter = new RedirectFilter($redis, "deadletter");

class UsersHandler
{
	#[Consume(
		payload: ["$.event" => "UserRegistered"]
	)]
	public function onUserRegistered(Message $message, EmailService $email): Response
	{
		$body = \json_decode($message->getPayload());

		$result = $email->send(
			$body->payload->email,
			$body->payload->name,
			"templates/registration.tpl"
		);

		if( $result === false ){
			return Response::nack;
		}

		return Response::ack;
	}
}

public function onUserCreated(Message $message): Response
{
	$payload = $message->getParsedPayload();

	// Do something with message...
}

$middleware = new ValidateMessage(
	new JsonSchemaValidator([
		"fruits" => \file_get_contents(__DIR__ . "/schemas/fruits.json"),
		"veggies" => \file_get_contents(__DIR__ . "/schemas/veggies.json")
	])
);

$middleware = new DeadletterMessage(
	new RedirectFilter($publisher, "deadletter")
);

class MyMiddleware implements MiddlewareInterface
{
	public function handle(Message $message, callable $next): mixed
	{
		Log::debug(
			"Received message",
			["topic" => $message->getTopic(), "payload" => $message->getPayload()]
		);

		$response = $next($message);

		if( $response === Response::deadletter ){
			Log::warning(
				"Deadletter message",
				["topic" => $message->getTopic(), "payload" => $message->getPayload()]
			);
		}

		return $response;
	}
}

$application->listen(
	location: "https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue",
	max_messages: 10,
	nack_timeout: 12,
	polling_timeout: 5,
	deadletter_options: ["option" => "value"]
);

$application->listen(
	location: ["users", "orders", "returns"]
);

$application->listen(
	location: "users, orders, returns"
);

$publisher = new ValidatorFilter(
	new JsonSchemaValidator([
		"fruits" => \file_get_contents(__DIR__ . "/schemas/fruits.json"),
		"veggies" => \file_get_contents(__DIR__ . "/schemas/veggies.json")
	]),
	new Mqtt(new MqttClient("localhost"))
);

$publisher->publish(new Message("veggies", \json_encode($payload)));