Download the PHP package thesis/amqp without Composer
On this page you can find all versions of the php package thesis/amqp. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Informations about the package amqp
Thesis Amqp
Pure asynchronous (fiber based) strictly typed full-featured PHP driver for AMQP 0.9.1 protocol.
Contents
- Installation
- Configuration
- vhost
- auth_mechanism
- heartbeat
- connection_timeout
- channel_max
- frame_max
- tcp_nodelay
- Client
- Channel
- exchange declare
- exchange bind
- exchange unbind
- exchange delete
- queue declare
- queue bind
- queue unbind
- queue purge
- queue delete
- publish
- publish batch
- get
- ack
- nack
- reject
- ack, nack, reject safety
- consume
- consume iterator
- consume batch
- consume batch iterator
- tx
- transactional
- confirms
- returns
- explicit returns
- License
Installation
Configuration
Configuration can be created from dsn, that follows the amqp uri spec.
Multiple addresses are supported. The client will connect to the first available amqp server host.
From array (for example, if you keep the configuration of your application as an array).
From primary constructor.
If the original amqp server settings remain unchanged, you can use Config::default()
.
vhost
The vhost
value should be configured with the path parameter.
auth_mechanism
To configure priority and availability of auth mechanisms provide query parameter auth_mechanism
.
By default plain
will be used. Current supported authentication mechanisms are plain
and amqplain
.
heartbeat
The heartbeat value must be in seconds.
By default 60 seconds
will be used as RabbitMQ suggest. To disable heartbeats set 0
.
connection_timeout
To configure tcp connection timeout use connection_timeout
with value in seconds.
The default value is 1000 milliseconds
.
channel_max
The channel_max
value tells to the client and amqp server how many channels will be used. The maximum and default is 65535
.
When the channel limit is exhausted, you will get an Thesis\Amqp\Exception\NoAvailableChannel
exception.
frame_max
frame_max
sets a size of chunks. By default, this setting uses 65535 bytes
(and this is the maximum).
If you doesn't understand the setting, you shouldn't change this value.
tcp_nodelay
You can disable tcp nodelay
by setting the value to false
.
Since we use an internal buffer to work with data, which reduces the number of network accesses, there is no need to turn off tcp nodelay
.
This may seriously reduce client performance.
Client
The client is the connection facade to the amqp
server. It is responsible for connecting and disconnecting (also closing all channels) from the server.
It is not necessary to explicitly connect to work with the client. The connection will be established when the first channel is created.
Channel
The new channel can be obtained only from the client.
- If you are terminating an application, you don't have to call
$channel->close()
, because$client->disconnect()
will close all channels anyway. - However, you cannot leave channels open during the life of the application without using them – otherwise you may exhaust the open channel limit from the
channel_max
setting. - After closing a channel yourself or getting a
Thesis\Amqp\Exception\ChannelWasClosed
exception, you cannot use the channel – open a new one.
exchange declare
exchangeDeclare
follows the standard amqp client api. No notable changes here.
exchange bind
exchangeBind
follows the standard amqp client api. No notable changes here.
exchange unbind
exchangeUnbind
follows the standard amqp client api. No notable changes here.
exchange delete
exchangeDelete
follows the standard amqp client api. No notable changes here.
queue declare
queueDeclare
returns a Queue
object if noWait
is set to false
. Otherwise, null
is returned, and this is checked statically.
queue bind
queueBind
follows the standard amqp client api. No notable changes here.
queue unbind
queueUnbind
follows the standard amqp client api. No notable changes here.
queue purge
queuePurge
returns a purged message count if noWait
is set to false
. Otherwise, null
is returned, and this is checked statically.
queue delete
queueDelete
returns a deleted message count if noWait
is set to false
. Otherwise, null
is returned, and this is checked statically.
publish
There are notable changes here compared to other libraries.
- First, the message is an object.
- Secondly, all system headers like
correlationId
,expiration
,messageId
and so on are placed in the properties of this object, so you don't have to pass them through user headers and remember how keys should be named.
publish batch
You can publish a batch of messages.
You will receive back the PublishBatchConfirmation
, which allows you to deal with confirmations:
PublishBatchConfirmation::awaitAll
- await all confirmations or throw an\LogicException
, if there are any unconfirmed messages.PublishBatchConfirmation::unconfirmed
- await all confirmations and return only unconfirmed ones.
get
get
returns a Delivery
object, which also has all system headers placed in properties.
It is safe to call Channel::get()
concurrently.
ack
ack
can be called on a Delivery
object.
Or through a channel.
It is safe to call ack
many times.
nack
nack
can be called on a Delivery
object.
Or through a channel.
It is safe to call nack
many times.
reject
reject
can be called on a Delivery
object.
Or through a channel.
It is safe to call reject
many times.
ack, nack, reject safety
It is safe to call nack/reject
after ack
or competitively. Operations will be ordered and processed only once.
For example, you want to call nack
on any error and ack
only on successful cases. Then you can write the code as follows:
Here ack
in finally
block will only be sent if neither nack
, reject
, nor ack
in the $handler
is called.
consume
consume
accepts a callback where Delivery
and Channel
will be passed to.
consume iterator
If you don't like the callback api
like I do, you can handle messages through an iterator
.
- The size of the
Iterator
should be equal to theprefetch count
provided toChannel::qos()
. - The
Iterator::complete()
will cancel the consumer and stop the loop.
Also, you can throw an exception using Iterator::cancel
.
consume batch
Although AMQP doesn't have a native way to receive messages in batches, we can achieve this using two operations — basic.qos(count: N)
and basic.ack(multiple: true)
on the last message. basic.qos
limits the number of messages the AMQP server can push to our consumer, and this number should match the batch size.
basic.ack(multiple: true)
allows us to send a single acknowledgment for the entire batch. You don’t need to implement this yourself — it's included with this library.
Simply use Channel::consumeBatch
and pass a callback. As an argument, you’ll receive a ConsumeBatch
instance, on which you can call ack
or nack
.
Note that you don’t need to call these functions on individual DeliveryMessage
— only on the ConsumeBatch
!
However, since it may take a while to fill a batch, you can specify a timeout
. This way, you'll receive a non-empty batch either when the required number of messages is collected or when the timer expires — whichever comes first.
See the example: you'll see two batches there — one will arrive immediately because the queue already contains enough messages and the second will arrive after a 1-second wait, consisting of just 3 messages.
Since basic.qos(count: N)
is a crucial requirement for implementing batching, the consumeBatch
and consumeBatchIterator
methods call it automatically.
You don’t need to call Channel::qos
yourself!
consume batch iterator
Just like with regular consumeIterator
, where you can work with an Iterator
, you can also process batches using an Iterator
.
By using consumeBatchIterator
instead of consumeBatch
, you get an Iterator
where each element is a ConsumeBatch
. See the example to understand how to use it.
tx
transactions
follows the standard amqp client api. No notable changes here.
- you can't call
txSelect
more than once. - after switching to the confirmation mode, transactions will be unavailable.
transactional
If you prefer not to manage the transaction yourself, you can use the Channel::transactional
method, which will put the channel into transactional mode and commit or rollback the transaction in case of an exception.
confirms
There are notable changes here compared to other libraries. Instead of a callback api through which you could handle confirmations,
you get a PublishConfirmation
object that can be waited on in non-blocking mode via await
.
The PublishConfirmation::await
will return PublishResult
enum that can be in one of the Acked, Nacked, Canceled, Waiting
states.
Since confirmations can return in batches, there is no need to wait for each confirmation in turn. Instead, you can publish many messages and wait for a confirmation at the end. If you are lucky, the amqp server will return multiple confirmations, or even one for the entire batches.
returns
Returned messages (with mandatory
flag set on Channel::publish
) can be handled in a separate callback.
explicit returns
In AMQP messaging system it’s possible for a published message to have no destination. This is acceptable in some scenarios such as the Publish-Subscribe pattern, where it’s fine for events to go unhandled, but not in others. For example, in the Command pattern every message is expected to be processed.
To detect and react to such delivery failures, you must publish messages with the mandatory
flag enabled. This tells the AMQP server to return any message that cannot be routed to at least one queue.
However, there’s a challenge: returned messages are delivered asynchronously via a separate thread (not the OS thread) and are not associated with the original publishing request. This means the publisher has no immediate way of knowing whether a message was routed or returned. In some cases, you may want to know this synchronously, so that you can:
- Log the message;
- Store the message in the DB;
- Automatically declare the required topology (e.g., queues or bindings) and republish.
To support this use case, the library provides a mechanism based on publisher confirms
and a custom header:
- Enable
publisher confirm
mode; - Set the
mandatory
flag when publishing.
The library will add a special header X-Thesis-Mandatory-Id
. This allows the library to correlate any returned message with its original publish request.
If the message is unroutable, the library will return PublishResult::Unrouted
.
⚠️ Important: This mechanism only works if
publisher confirms
are enabled. Without them the library cannot track which messages were successfully published to queues, because no frame will receive.
License
The MIT License (MIT). Please see License File for more information.
All versions of amqp with dependencies
ext-filter Version *
amphp/amp Version ^3.0
amphp/pipeline Version ^1.2
amphp/socket Version ^2.3
revolt/event-loop Version ^1.0
thesis/amp-bridge Version ^0.1.0
thesis/byte-buffer Version ^0.1.0
thesis/byte-order Version ^0.2.0
thesis/byte-reader Version ^0.3.1
thesis/byte-reader-writer Version ^0.1.0
thesis/byte-writer Version ^0.2.1
thesis/endian Version ^0.1.0