Download the PHP package b2pweb/bdf-queue without Composer
On this page you can find all versions of the php package b2pweb/bdf-queue. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download b2pweb/bdf-queue
More information about b2pweb/bdf-queue
Files in b2pweb/bdf-queue
Package bdf-queue
Short Description Bdf queue component
License MIT
Informations about the package bdf-queue
Queue
This package provides 2 layers for abstraction of message broker.
- A connection layer
- A destination layer
Supports
Message Broker | Library | Driver name |
---|---|---|
Beanstalk | Pheanstalk | pheanstalk |
Db | Doctrine | doctrine+(*) |
Enqueue | php-enqueue | enqueue+(*) |
Gearman | Pecl Gearman | gearman |
Kafka | RdKafka | rdkafka |
Memory | memory | |
Null | null | |
RabbitMQ | Amqp lib | amqp-lib |
Redis (Ext) | PhpRedis | redis+phpredis |
Redis | PRedis | redis+predis |
Usage Instructions
Produce messages
First, create a new destination manager instance.
Push a basic message into the queue. The consume should defined handler to process the message.
Useful for monolithic application that needs to differ a process. Push a message job into the queue. The consumer will evaluate the job string and run the processor. In this use case the producer and the receiver share the same model.
Available type for dsn destination
The class Bdf\Queue\Destination\DsnDestinationFactory
provides default type of destination:
Name | Exemple | Definition |
---|---|---|
queue | queue://connection_name/queue_name | Publish and consume a single queue |
queues | queues://connection_name/queue1,queue2 | Only consume multi queues |
topic | topic://connection_name/topic | Publish and consume a topic. Pattern with wildcard are allowed for consumer use case only (ex: topic.*) |
topics | topics://connection_name/topic1,topic2 | Only consume multi topics |
You can declare your own type:
Consume messages
The consumer layer provides many tools for message handling. The default stack of objects that will receive the message is:
consumer (ConsumerInterface) -> receivers (ReceiverInterface) -> processor (ProcessorInterface) -> handler (callable)
consumer
has the strategy for reading the message from queue / topic. It also manage a graceful shutdown.receivers
is the stack of middlewares interacts with the envelope.processor
resolves the handler arguments. You can plug here your business logic and remove the handler layer. By default processor injects 2 arguments in handlers: the message data and the envelope.handler
manages the business logic. Handler allows an interface less mode.
An example to consume a simple message:
Consume a job message:
Create a handler
Use the synthax "Class@method"
to determine the callable (By default the method is "handle")
or register your handlers on a specific destination with the receiver builder:
Run the consumer in console
Create receiver extensions
The consumer use a stack of receivers to extend the reception of messages.
See the interface Bdf\Queue\Consumer\ReceiverInterface
and the trait Bdf\Queue\Consumer\DelegateHelper
.
You can use the Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add()
to register your receiver in the stack
Customize the string payload
The class Bdf\Queue\Serializer\SerializerInterface
manage the payload content sent to the message broker.
By default metadata are added to the json as:
- PHP Type: to help consumer to deserialize complex entities.
- Message info: The attempt number for retry, The sending date, ...
A basic payload looks like:
You can customize the string with your own implementation of the serializer interface.
Try the hello world example (configure the message broker in example/config/connections.php
):
RPC client
Additionnal options for connection
Option | Type | Supports | Description |
---|---|---|---|
driver |
string | all | The name of the driver to use. See driver name in support section. |
vendor |
string | all | Second part of the protocol. Vendor is used by some driver that use internal drivers. |
queue |
string | all | The default queue of the connection used only if no queue has been set on the message. Destination should provide the queue. |
host |
string | all | The host / ip to connect to message broker. Usually set to localhost . |
port |
int | all | The port of the message broker. Usually set to the default port. |
user |
string | all | |
password |
string | all | |
prefetch |
int | all | Load a number of message in memory. Faster for some broker that supports reservation |
serializer |
string | all | Load a serializer for this connection. Used only by driver that needs serializer. |
vhost |
string | amqp-lib | Default / . |
group |
string | amqp-lib | Group use by topic to allows set of consumers on the same topic. Default bdf . |
sleep_duration |
int | amqp-lib | The internal sleep in milliseconds between two pop. Default 200 . |
queue_flags |
int | amqp-lib | The flag for queue declaration. See AmqpDriver constants. Default 2 (FLAG_QUEUE_DURABLE value). |
topic_flags |
int | amqp-lib | The flag for topic declaration. See AmqpDriver constants. Default 0 (FLAG_NOPARAM value). |
consumer_flags |
int | amqp-lib | The flag for consumer. See AmqpDriver constants. Default 0 (FLAG_NOPARAM value). |
auto_declare |
bool | amqp-lib, redis, enqueue | Auto declare the queue when pushing or poping. Use queue setup command otherwise. Default false . |
qos_prefetch_size |
int | amqp-lib | Prefetch optimisation. Default 0 . |
qos_prefetch_count |
int | amqp-lib | Prefetch optimisation. Default 1 . |
qos_global |
int | amqp-lib | Prefetch optimisation. Default false . |
table |
string | doctrine | The table name to use to store message. Default value doctrine_queue |
ttr |
int | pheanstalk | Time to run in seconds. Can also be defined in message header. Default 60 . |
client-timeout |
int | pheanstalk, gearman | Timeout of client in milliseconds. Disable by default. |
commitAsync |
bool | rdkafka | Enable asynchrone ack. Default false . |
offset |
int | rdkafka | Position to start consumer. Default null . |
partition |
int | rdkafka | Partition to for the consumer, see kafka constant. Default -1 (RD_KAFKA_PARTITION_UA value). |
global |
array | rdkafka | Kafka config for global settings. |
producer |
array | rdkafka | Kafka config for producer. |
consumer |
array | rdkafka | Kafka config for the consume |
poll_timeout |
int | rdkafka | The timeout for the poll method in milliseconds. |
flush_timeout |
int | rdkafka | The timeout for the flush method in milliseconds. |
dr_msg_cb |
callable | rdkafka | Delivery report callback. |
error_cb |
callable | rdkafka | Error callback. |
rebalance_cb |
callable | rdkafka | Called after consumer group has been rebalanced. |
stats_cb |
callable | rdkafka | Statistics callback. |
partitioner |
string | rdkafka | Kafka partitioner for topic settings. |
group |
string | rdkafka | Group use by topic to allows set of consumers on the same topic. Default "2" . |
timeout |
int | redis | The connection timeout in seconds. Default 0 . |
prefix |
string | redis | The key prefix. Default queues: . |
Note:
- Format of a valid DSN: {driver}+{vendor}://{user}:{password}@{host}:{port}/{queue}?{option}=value
- See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for more kafka options.
Additionnal options for message
Option | Type | Supports | Description |
---|---|---|---|
flags |
int | amqp-lib | The flags for message. See driver constants. |
priority |
int | pheanstalk | Priority message. Default 1024 . |
ttr |
int | pheanstalk | Time to run in seconds. Default 60 . |
key |
string | rdkafka | |
partition |
int | rdkafka | The number of the partition. |
Serialization
Benchmarks
simple job / closure job
Serializer | Serializer | +Compress | Bdf JSON | +Compress | Bdf binary |
---|---|---|---|---|---|
Size | 141 / 377 | 105 / 244 | 109 / 407 | 76 / 247 | 98 / 355 |
Serialize time | 0.0014 / 6.8 | 0.016 / 7 | 0.011 / 7 | 0.026 / 7 | 0.011 / 7 |
Unserialize time | 0.007 / 0.0025 | 0.0082 / 0.0068 | 0.024 / 0.015 | 0.024 / 0.019 | 0.019 / 0.011 |
Analysis
- For the best execution time, regardless of size, use the default
Serializer
- For the smaller size, regardless of time, use
BdfSerializer
withCompressedSerializer
- For the best compromise, use
Serializer
withCompressedSerializer
- Always smaller than pure
BdfSerializer
(JSON or Binary) - Faster on unserialize, slightly slower on serialize
- Around twice faster than compressed bdf, but only ~40% larger on simple job
- Always smaller than pure
License
Distributed under the terms of the MIT license.
All versions of bdf-queue with dependencies
ext-pcntl Version *
b2pweb/bdf-dsn Version ~1.0
b2pweb/bdf-instantiator Version ~1.0
symfony/console Version ~5.4|~6.0|~7.0
symfony/messenger Version ~5.4|~6.0|~7.0