1. Go to this page and download the library: Download amphp/cluster library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
amphp / cluster example snippets
// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\Socket;
use Revolt\EventLoop;
use function Amp\Socket\listen;
$ipcHub = new LocalIpcHub();
// Sharing the IpcHub instance with the context factory isn't be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);
$socketPipe = new ClientSocketSendPipe($socket);
$server = listen('127.0.0.1:1337');
// Close server when SIGTERM is received.
EventLoop::onSignal(SIGTERM, $server->close(...));
$clientId = 0;
while ($client = $server->accept()) {
// $clientId is an example of arbitrary data which may be
// associated with a transferred socket.
$socketPipe->send($client, ++$clientId);
}
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Socket\Socket;
use Amp\Sync\Channel;
return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
$socketPipe = new ClientSocketReceivePipe($socket);
while ($transferredSocket = $socketPipe->receive()) {
// Handle client socket in a separate coroutine (fiber).
async(
function (Socket $client, int $id) { /* ... */ },
$transferredSocket->getSocket(), // Transferred socket
$transferredSocket->getData(), // Associated data
);
}
};
// parent.php
use Amp\CancelledException;
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Cluster\ServerSocketPipeProvider;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\SignalCancellation;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\Socket\listen;
$ipcHub = new LocalIpcHub();
$serverProvider = new ServerSocketPipeProvider();
// Sharing the IpcHub instance with the context factory isn't lled by signal.
try {
$serverProvider->provideFor($socket, new SignalCancellation(SIGTERM));
} catch (CancelledException) {
// Signal cancellation expected.
}
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Cluster\ServerSocketPipeFactory;
use Amp\Sync\Channel;
return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
$serverFactory = new ServerSocketPipeFactory($socket);
// Requests the server socket from the parent process.
$server = $serverFactory->listen('127.0.0.1:1337');
while ($client = $server->accept()) {
// Handle client socket in a separate coroutine (fiber).
async(function () use ($client) { /* ... */ });
}
};
use Amp\Cluster\ClusterWatcher;
use Revolt\EventLoop;
$watcher = new ClusterWatcher('path/to/script.php');
$watcher->start(4); // Start cluster with 4 workers.
// Using a signal to stop the cluster for this example.
EventLoop::onSignal(SIGTERM, fn () => $watcher->stop());
foreach ($watcher->getMessageIterator() as $message) {
// Handle received message from worker.
}
mp\ByteStream;
use Amp\Cluster\Cluster;
use Amp\Http\Server\Driver\ConnectionLimitingServerSocketFactory;
use Amp\Http\Server\Driver\SocketClientFactory;
use Amp\Http\Server\RequestHandler\ClosureRequestHandler;
use Amp\Http\Server\SocketHttpServer;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Monolog\Logger;
$id = Cluster::getContextId() ?? getmypid();
// Creating a log handler in this way allows the script to be run in a cluster or standalone.
if (Cluster::isWorker()) {
$handler = Cluster::createLogHandler();
} else {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
}
$logger = new Logger('worker-' . $id);
$logger->pushHandler($handler);
$logger->useLoggingLoopDetection(false);
// Cluster::getServerSocketFactory() will return a factory which creates the socket
// locally or requests the server socket from the cluster watcher.
$socketFactory = Cluster::getServerSocketFactory();
$clientFactory = new SocketClientFactory($logger);
$httpServer = new SocketHttpServer($logger, $socketFactory, $clientFactory);
$httpServer->expose('127.0.0.1:1337');
// Start the HTTP server
$httpServer->start(
new ClosureRequestHandler(function (): Response {
return new Response(HttpStatus::OK, [
"content-type" => "text/plain; charset=utf-8",
], "Hello, World!");
}),
new DefaultErrorHandler(),
);
// Stop the server when the cluster watcher is terminated.
Cluster::awaitTermination();
$server->stop();