Download the PHP package amphp/pipeline without Composer
On this page you can find all versions of the php package amphp/pipeline. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download amphp/pipeline
More information about amphp/pipeline
Files in amphp/pipeline
Package pipeline
Short Description Asynchronous iterators and operators.
License MIT
Homepage https://amphp.org/pipeline
Informations about the package pipeline
amphp/pipeline
AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind.
amphp/pipeline
provides concurrent iterators and collection operators.
Installation
This package can be installed as a Composer dependency.
Requirements
This package requires PHP 8.1 or later.
Usage
Using fiber-based coroutines, asynchronous sets can now be created and consumed within a single fiber using PHP's built-in Iterator
. Attempting to consume an Iterator
instance from multiple fibers is problematic though, as one fiber may modify the state of the iterator while another is suspended.
This library provides a ConcurrentIterator
interface which provides a fiber-safe iterator that may be consumed by multiple fibers concurrently, as well as tools for creating asynchronous sets.
Concurrent Iterators
A ConcurrentIterator
may be used in place of an Iterator
, meaning it can be used with foreach
, yield from
, iterator_to_array()
, argument unpacking, and more!
Like an Iterator
, a ConcurrentIterator
may also be iterated manually, with separate methods for advancing and retrieving the current value.
continue()
suspends the current fiber until a value becomes available or the iterator completes, returning true
or false
respectively. An exception is thrown from continue()
the source of the iterator throws an exception while generating the next value.
getValue()
returns the last value emitted on the iterator within the current fiber. The return value of this function will not change within the current fiber until continue()
is called again. continue()
must be invoked and return before this method can be called.
getPosition()
returns the current 0-indexed position within the current iterator. If consuming from multiple fibers, this value may not be sequential within a single fiber. Similar to getValue()
, continue()
must be invoked and return before this method can be called.
Note In general, it is not necessary to call these methods directly within application code. Concurrent iterators typically should be used with
foreach
.
Queue
A Queue
is used to create an asynchronous set of values with the ability to await consumption of the values produced, providing back-pressure to the production of more values, so consumption and production can be synchronized.
Values may be added to a Queue
in two ways.
push()
adds the value to the queue, only returning once the value has been consumed from the queue.pushAsync()
adds the value to the queue, returning aFuture
immediately which is completed only once the value has been consumed from the queue.
Once all values have been pushed into a Queue
, the producer must call complete()
to end the concurrent iterator. Failure to do so will leave the consumer suspended indefinitely. Alternatively to indicate an error, the producer may use error()
to throw an exception to the concurrent iterator consumer and end the concurrent iterator.
DisposedException
If the consumer of the concurrent iterator generated by the Queue
is destroyed, push()
will throw a DisposedException
(or the future returned from pushAsync()
will error with a DisposedException
). This indicates that no additional values need to be generated since consumption of those values has ended. If for some reason the producer wishes to continue (e.g., to consume bytes from a buffer), either catch the exception or ignore the future. (The DisposedException
instance is created only once for each queue.)
Pipeline
A Pipeline
represents an asynchronous set and provides operations which can be applied over the set.
Alternatively, Pipeline
also has methods which consume the set, such as forEach()
or reduce()
, which return only once the set is complete or throws an exception.
Versioning
amphp/pipeline
follows the semver semantic versioning specification like all other amphp
packages.
Security
If you discover any security related issues, please email [email protected]
instead of using the issue tracker.
License
The MIT License (MIT). Please see LICENSE
for more information.