PHP code example of amphp / parallel

1. Go to this page and download the library: Download amphp/parallel library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

amphp / parallel example snippets




mp\Future;
use Amp\Parallel\Worker;
use function Amp\async;

$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$executions = [];
foreach ($urls as $url) {
    // FetchTask is just an example, you'll have to implement
    // the Task interface for your task.
    $executions[$url] = Worker\submit(new FetchTask($url));
}

// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
    fn (Worker\Execution $e) => $e->getFuture(),
    $executions,
));

foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

class FetchTask implements Task
{
    public function __construct(
        private readonly string $url,
    ) {
    }

    public function run(Channel $channel, Cancellation $cancellation): string
    {
        return file_get_contents($this->url); // Example blocking function
    }
}

// main.php

$worker = Amp\Parallel\Worker\createWorker();
$task = new FetchTask('https://amphp.org');

$execution = $worker->submit($task);

// $data will be the return value from FetchTask::run()
$data = $execution->await();

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

final class ExampleTask implements Task
{
    private static ?LocalCache $cache = null;
    
    public function run(Channel $channel, Cancellation $cancellation): mixed
    {
        $cache = self::$cache ??= new LocalCache();
        $cachedValue = $cache->get('cache-key');
        // Use and modify $cachedValue...
        $cache->set('cache-key', $updatedValue);
        return $updatedValue;
    }
}

// child.php

use Amp\Sync\Channel;

return function (Channel $channel): mixed {
    $url = $channel->receive();

    $data = file_get_contents($url); // Example blocking function

    $channel->send($data);

    return 'Any serializable data';
};

// parent.php

use Amp\Parallel\Context\ProcessContext;

// Creates and starts a child process or thread.
$context = Amp\Parallel\Context\contextFactory()->start(__DIR__ . '/child.php');

$url = 'https://google.com';
$context->send($url);

$requestData = $context->receive();
printf("Received %d bytes from %s\n", \strlen($requestData), $url);

$returnValue = $context->join();
printf("Child processes exited with '%s'\n", $returnValue);

// Using the global context factory from Amp\Parallel\Context\contextFactory()
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');

// Creating a specific context factory and using it to create a context.
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

// AppMessage.php

class AppMessage {
    public function __construct(
        public readonly AppMessageType $type,
        public readonly mixed $data,
    ) {
    }
}

// AppMessageType.php

enum AppMessageType {
    case ProcessedImage;
    case ProcessImageFromPath;
    // Other enum cases for further message types...
}

// parent.php

use Amp\Parallel\Context\ProcessContextFactory;

$contextFactory = new ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

$stdin = Amp\ByteStream\getStdin();

while ($path = $stdin->read()) {
    $message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
    $context->send($message);

    $reply = $context->receive(); // Wait for reply from child context with processed image data.
}

$context->send(null); // End loop in child process.
$context->join();

// child.php

use Amp\Sync\Channel;

return function (Channel $channel): void {
    /** @var AppMessage|null $message */
    while ($message = $channel->receive()) {
        $reply = match ($message->type) {
            AppMessageType::ProcessImageFromPath => new AppMessage(
                AppMessageType::ProcessedImage,
                ImageProcessor::process($message->data),
            ),
            // Handle other message types...
        }
        
        $channel->send($reply);
    }
};

// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;

$ipcHub = new LocalIpcHub();

// Sharing the IpcHub instance with the context factory isn't 'uri' => $ipcHub->getUri(), 'key' => $connectionKey]);

// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);

$socketPipe = new ClientSocketSendPipe($socket);

// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Sync\Channel;

return function (Channel $channel): void {
    ['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
    
    // $socket will be a bidirectional socket to the parent.
    $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
    
    $socketPipe = new ClientSocketReceivePipe($socket);
};