Download the PHP package flowpack/jobqueue-common without Composer
On this page you can find all versions of the php package flowpack/jobqueue-common. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download flowpack/jobqueue-common
More information about flowpack/jobqueue-common
Files in flowpack/jobqueue-common
Package jobqueue-common
Short Description Common functionality for a JobQueue in Flow applications.
License MIT
Informations about the package jobqueue-common
Flowpack.JobQueue.Common
Neos Flow package that allows for asynchronous and distributed execution of tasks.
Table of contents
- Quickstart
- Introduction
- Message Queue
- Job Queue
- Command Line Interface
- Signals & Slots
- License
- Contributions
Quickstart (TL;DR)
-
Install this package using composer:
(or by adding the dependency to the composer manifest of an installed package)
-
Configure a basic queue by adding the following to your
Settings.yaml
: -
Initialize the queue (if required)
With
you can setup the queue and/or verify its configuration. In the case of the
FakeQueue
that step is not required.Note: The
queue:setup
command won't remove any existing messages, there is no harm in calling it multiple times -
Annotate any public method you want to be executed asynchronously:
or use attributes instead of annotations (PHP 8.0 and later):
Note: The method needs to be public and it must not return anything
-
Start the worker (if required)
With the above code in place, whenever the method
SomeClass::sendEmail()
is about to be called that method call is converted into a job that is executed asynchronously[1].Unless you use the
FakeQueue
like in the example, a so calledworker
has to be started, to listen for new jobs and execute them::
Introduction
To get started let's first define some terms:
- Message
-
A piece of information passed between programs or systems, sometimes also referred to as "Event".
In the JobQueue packages we use messages to transmit `Jobs`. - Message Queue
-
According to Wikipedia "message queues [...] are software-engineering components used for inter-process communication (IPC), or for inter-thread communication within the same process".
In the context of the JobQueue packages we refer to "Message Queue" as a FIFO buffer that distributes messages to one or more consumers, so that every message is only processed once. - Job
-
A unit of work to be executed (asynchronously).
In the JobQueue packages we use the Message Queue to store serialized jobs, so it acts as a "Job stream". - Job Manager
- Central authority allowing adding and fetching jobs to/from the Message Queue.
- Worker
-
The worker watches a queue and triggers the job execution.
This package comes with a `job:work` command that does this (see below) - submit
- New messages are *submitted* to a queue to be processed by a worker
- reserve
-
Before a message can be processed it has to be *reserved*.
The queue guarantees that a single message can never be reserved by two workers (unless it has been released again) - release
-
A reserved message can be *released* to the queue to be processed at a later time.
The *JobManager* does this if Job execution failed and the `maximumNumberOfReleases` setting for the queue is greater than zero - abort
-
If a message could not be processed successfully it is *aborted* marking it *failed* in the respective queue so that it can't be reserved again.
The *JobManager* aborts a message if Job execution failed and the message can't be released (again) - finish
-
If a message was processed successfully it is marked *finished*.
The *JobManager* finishes a message if Job execution succeeded.
Message Queue
The Flowpack.JobQueue.Common
package comes with a very basic Message Queue implementation Flowpack\JobQueue\Common\Queue\FakeQueue
that allows for execution of Jobs using sub requests.
It doesn't need any 3rd party tools or server loops and works for basic scenarios. But it has a couple of limitations to be aware of:
-
It is not actually a queue, but dispatches jobs immediately as they are queued. So it's not possible to distribute the work to multiple workers
-
The
JobManager
is not involved in processing of jobs so the jobs need to take care of error handling themselves. -
For the same reason Signals are not emitted for the
FakeQueue
. - With Flow 3.3+ The
FakeQueue
supports a flagasync
. Without that flag set, executing jobs block the main thread!
For advanced usage it is recommended to use one of the implementing packages like one of the following:
Configuration
This is the simplest configuration for a queue:
With this a queue named test
will be available.
Note: For reusable packages you should consider adding a vendor specific prefixes to avoid collisions. We recommend to use a classname or the package name with the function name (e.g. Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.
Queue parameters
The following parameters are supported by all queues:
Parameter | Type | Default | Description |
---|---|---|---|
className | string | - | FQN of the class implementing the queue |
maximumNumberOfReleases | integer | 3 | Max. number of times a message is re- released to the queue if a job failed |
executeIsolated | boolean | FALSE | If TRUE jobs for this queue are executed in a separate Thread. This makes sense in order to avoid memory leaks and side-effects |
outputResults | boolean | FALSE | If TRUE the full output (stdout + stderr) of the respective job is forwarded to the stdout of its "parent" (only applicable if executeIsolated is true ) |
queueNamePrefix | string | - | Optional prefix for the internal queue name, allowing to re-use the same backend over multiple installations |
options | array | - | Options for the queue. Implementation specific (see corresponding package) |
releaseOptions | array | - | Options that will be passed to release() when a job failed.Implementation specific (see corresponding package) |
A more complex example could look something like:
As you can see, you can have multiple queues in one installations. That allows you to use different backends/options for queues depending on the requirements.
Presets
If multiple queries share common configuration presets can be used to ease readability and maintainability:
This will configure two DoctrineQueue
s "email" and "log" with some common options but different table names and poll intervals.
Job Queue
The job is an arbitrary class implementing Flowpack\JobQueue\Common\Job\JobInterface
.
This package comes with one implementation StaticMethodCallJob
that allows for invoking a public method (see Quickstart)
but often it makes sense to create a custom Job:
Note: It's crucial that the execute()
method returns TRUE on success, otherwise the corresponding message will be released again and/or marked failed.
With that in place, the new job can be added to a queue like this:
Command Line Interface
Use the flowpack.jobqueue.common:queue:*
and flowpack.jobqueue.common:job:*
commands to interact with the job queues:
Command | Description |
---|---|
queue:list | List configured queues |
queue:describe | Shows details for a given queue (settings, ..) |
queue:setup | Initialize a queue (i.e. create required db tables, check connection, ...) |
queue:flush | Remove all messages from a queue (requires --force flag) |
queue:submit | Submit a message to a queue (mainly for testing) |
job:work | Work on a queue and execute jobs |
job:list | List queued jobs |
Signal & Slots
When working with JobQueues proper monitoring is crucial as failures might not be visible immediately.
The JobManager
emits signals for all relevant events, namely:
- messageSubmitted
- messageTimeout
- messageReserved
- messageFinished
- messageReleased
- messageFailed
Those can be used to implement some more sophisticated logging for example:
This would log every failed message to the system log.
License
This package is licensed under the MIT license
Contributions
Pull-Requests are more than welcome. Make sure to read the Code Of Conduct.
[1] The FakeQueue
actually executes Jobs synchronously unless the async
flag is set (requires Flow 3.3+)
All versions of jobqueue-common with dependencies
neos/flow Version ^6.3 || ^7.0 || ^8.0 || ^9.0
neos/cache Version *