1. Go to this page and download the library: Download amphp/parallel library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
amphp / parallel example snippets
mp\Future;
use Amp\Parallel\Worker;
use function Amp\async;
$urls = [
'https://secure.php.net',
'https://amphp.org',
'https://github.com',
];
$executions = [];
foreach ($urls as $url) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$executions[$url] = Worker\submit(new FetchTask($url));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
fn (Worker\Execution $e) => $e->getFuture(),
$executions,
));
foreach ($responses as $url => $response) {
\printf("Read %d bytes from %s\n", \strlen($response), $url);
}
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class FetchTask implements Task
{
public function __construct(
private readonly string $url,
) {
}
public function run(Channel $channel, Cancellation $cancellation): string
{
return file_get_contents($this->url); // Example blocking function
}
}
// main.php
$worker = Amp\Parallel\Worker\createWorker();
$task = new FetchTask('https://amphp.org');
$execution = $worker->submit($task);
// $data will be the return value from FetchTask::run()
$data = $execution->await();
use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
final class ExampleTask implements Task
{
private static ?LocalCache $cache = null;
public function run(Channel $channel, Cancellation $cancellation): mixed
{
$cache = self::$cache ??= new LocalCache();
$cachedValue = $cache->get('cache-key');
// Use and modify $cachedValue...
$cache->set('cache-key', $updatedValue);
return $updatedValue;
}
}
// child.php
use Amp\Sync\Channel;
return function (Channel $channel): mixed {
$url = $channel->receive();
$data = file_get_contents($url); // Example blocking function
$channel->send($data);
return 'Any serializable data';
};
// parent.php
use Amp\Parallel\Context\ProcessContext;
// Creates and starts a child process or thread.
$context = Amp\Parallel\Context\contextFactory()->start(__DIR__ . '/child.php');
$url = 'https://google.com';
$context->send($url);
$requestData = $context->receive();
printf("Received %d bytes from %s\n", \strlen($requestData), $url);
$returnValue = $context->join();
printf("Child processes exited with '%s'\n", $returnValue);
// Using the global context factory from Amp\Parallel\Context\contextFactory()
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');
// Creating a specific context factory and using it to create a context.
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');
// AppMessage.php
class AppMessage {
public function __construct(
public readonly AppMessageType $type,
public readonly mixed $data,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage;
case ProcessImageFromPath;
// Other enum cases for further message types...
}
// parent.php
use Amp\Parallel\Context\ProcessContextFactory;
$contextFactory = new ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');
$stdin = Amp\ByteStream\getStdin();
while ($path = $stdin->read()) {
$message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
$context->send($message);
$reply = $context->receive(); // Wait for reply from child context with processed image data.
}
$context->send(null); // End loop in child process.
$context->join();
// child.php
use Amp\Sync\Channel;
return function (Channel $channel): void {
/** @var AppMessage|null $message */
while ($message = $channel->receive()) {
$reply = match ($message->type) {
AppMessageType::ProcessImageFromPath => new AppMessage(
AppMessageType::ProcessedImage,
ImageProcessor::process($message->data),
),
// Handle other message types...
}
$channel->send($reply);
}
};
// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
$ipcHub = new LocalIpcHub();
// Sharing the IpcHub instance with the context factory isn't 'uri' => $ipcHub->getUri(), 'key' => $connectionKey]);
// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);
$socketPipe = new ClientSocketSendPipe($socket);
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Sync\Channel;
return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
$socketPipe = new ClientSocketReceivePipe($socket);
};
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.