PHP code example of amphp / pipeline

1. Go to this page and download the library: Download amphp/pipeline library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

amphp / pipeline example snippets


use Amp\Pipeline\Pipeline;

// Pipeline::getIterator() returns a ConcurrentIterator
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
while ($concurrentIterator->continue()) {
    $position = $concurrentIterator->getPosition();
    $value = $concurrentIterator->getValue();

    // ...
}

// Equivalently, multiple fibers may consume a single ConcurrentIterator
// instance using foreach.
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
foreach ($concurrentIterator as $position => $value) {
    // ...
}


use Amp\Pipeline\Queue;
use function Amp\async;
use function Amp\delay;

$queue = new Queue();

$start = \microtime(true);
$elapsed = fn () => \microtime(true) - $start;

// Generate values in a separate fiber
async(function () use ($queue, $elapsed): void {
    printf("Starting production loop at %.3fs\n", $elapsed());

    foreach (range(1, 10) as $value) {
        delay(0.1); // Production of a value takes between 100ms
        $queue->push($value);
    }

    printf("Completing production loop at %.3fs\n", $elapsed());

    // Queue must be completed, otherwise foreach loop below will never exit!
    $queue->complete();
});

foreach ($queue->iterate() as $value) {
    printf("Iterator yielded %d at %.3fs\n", $value, $elapsed());
    delay(0.5); // Listener consumption takes 500 ms
}

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$pipeline = $pipeline
    ->concurrent(10) // Process up to 10 items concurrently
    ->unordered() // Results may be consumed eagerly and out of order
    ->tap(fn () => delay(random_int(1, 10) / 10)) // Observe each value with a delay for 0.1 to 1 seconds, simulating I/O
    ->map(fn (int $input) => $input * 10) // Apply an operation to each value
    ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3

foreach ($pipeline as $value) {
    echo $value, "\n";
}

use Amp\Pipeline\Pipeline;

Pipeline::generate(function (): int { static $v = 0; return ++$v; })
    ->take(10) // Take only 10 values from the generation function.
    ->concurrent(3) // Process 3 values concurrently
    ->delay(1) // Delay for 1 second to simulate I/O
    ->forEach(function (int $value): void {
        echo $value, "\n";
    });