PHP code example of amphp / cluster

1. Go to this page and download the library: Download amphp/cluster library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

amphp / cluster example snippets


// parent.php

use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\Socket;
use Revolt\EventLoop;
use function Amp\Socket\listen;

$ipcHub = new LocalIpcHub();

// Sharing the IpcHub instance with the context factory isn't be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);

$socketPipe = new ClientSocketSendPipe($socket);

$server = listen('127.0.0.1:1337');

// Close server when SIGTERM is received.
EventLoop::onSignal(SIGTERM, $server->close(...));

$clientId = 0;
while ($client = $server->accept()) {
    // $clientId is an example of arbitrary data which may be
    // associated with a transferred socket.
    $socketPipe->send($client, ++$clientId);
}

// child.php

use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Socket\Socket;
use Amp\Sync\Channel;

return function (Channel $channel): void {
    ['uri' => $uri, 'key' => $connectionKey] = $channel->receive();

    // $socket will be a bidirectional socket to the parent.
    $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);

    $socketPipe = new ClientSocketReceivePipe($socket);

    while ($transferredSocket = $socketPipe->receive()) {
        // Handle client socket in a separate coroutine (fiber).
        async(
            function (Socket $client, int $id) { /* ... */ },
            $transferredSocket->getSocket(), // Transferred socket
            $transferredSocket->getData(), // Associated data
        );
    }
};

// parent.php

use Amp\CancelledException;
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Cluster\ServerSocketPipeProvider;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\SignalCancellation;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\Socket\listen;

$ipcHub = new LocalIpcHub();

$serverProvider = new ServerSocketPipeProvider();

// Sharing the IpcHub instance with the context factory isn't lled by signal.
try {
    $serverProvider->provideFor($socket, new SignalCancellation(SIGTERM));
} catch (CancelledException) {
    // Signal cancellation expected.
}

// child.php

use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Cluster\ServerSocketPipeFactory;
use Amp\Sync\Channel;

return function (Channel $channel): void {
    ['uri' => $uri, 'key' => $connectionKey] = $channel->receive();

    // $socket will be a bidirectional socket to the parent.
    $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);

    $serverFactory = new ServerSocketPipeFactory($socket);

    // Requests the server socket from the parent process.
    $server = $serverFactory->listen('127.0.0.1:1337');

    while ($client = $server->accept()) {
        // Handle client socket in a separate coroutine (fiber).
        async(function () use ($client) { /* ... */ });
    }
};

use Amp\Cluster\ClusterWatcher;
use Revolt\EventLoop;

$watcher = new ClusterWatcher('path/to/script.php');
$watcher->start(4); // Start cluster with 4 workers.

// Using a signal to stop the cluster for this example.
EventLoop::onSignal(SIGTERM, fn () => $watcher->stop());

foreach ($watcher->getMessageIterator() as $message) {
    // Handle received message from worker.
}



mp\ByteStream;
use Amp\Cluster\Cluster;
use Amp\Http\Server\Driver\ConnectionLimitingServerSocketFactory;
use Amp\Http\Server\Driver\SocketClientFactory;
use Amp\Http\Server\RequestHandler\ClosureRequestHandler;
use Amp\Http\Server\SocketHttpServer;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Monolog\Logger;

$id = Cluster::getContextId() ?? getmypid();

// Creating a log handler in this way allows the script to be run in a cluster or standalone.
if (Cluster::isWorker()) {
    $handler = Cluster::createLogHandler();
} else {
    $handler = new StreamHandler(ByteStream\getStdout());
    $handler->setFormatter(new ConsoleFormatter());
}

$logger = new Logger('worker-' . $id);
$logger->pushHandler($handler);
$logger->useLoggingLoopDetection(false);

// Cluster::getServerSocketFactory() will return a factory which creates the socket
// locally or requests the server socket from the cluster watcher.
$socketFactory = Cluster::getServerSocketFactory();
$clientFactory = new SocketClientFactory($logger);

$httpServer = new SocketHttpServer($logger, $socketFactory, $clientFactory);
$httpServer->expose('127.0.0.1:1337');

// Start the HTTP server
$httpServer->start(
    new ClosureRequestHandler(function (): Response {
        return new Response(HttpStatus::OK, [
            "content-type" => "text/plain; charset=utf-8",
        ], "Hello, World!");
    }),
    new DefaultErrorHandler(),
);

// Stop the server when the cluster watcher is terminated.
Cluster::awaitTermination();

$server->stop();
bash
vendor/bin/cluster --workers=4 path/to/script.php