PHP code example of exsyst / worker

1. Go to this page and download the library: Download exsyst/worker library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

exsyst / worker example snippets



$wf = new WorkerFactory();
$w = $wf->createWorker(MyWorkerImpl::class);
for ($i = 0; $i < 10; ++$i) {
  $w->sendMessage($i);
  var_dump($w->receiveMessage());
}


class MyWorkerImpl implements RawWorkerImplementationInterface
{
  public function run(ChannelInterface $channel)
  {
    for (; ; ) {
      try {
        $message = $channel->receiveMessage();
      } catch (\UnderflowException $e) {
        // The master closed the connection (its Worker object went out of scope)
        break;
      }
      $channel->sendMessage($message . " squared is " . ($message * $message));
    }
  }
}


class MyWorkerImpl implements EventedWorkerImplementationInterface
{
  public function setLoop(LoopInterface $loop) { }
  public function initialize() { }
  public function onConnect(ChannelInterface $channel, $peerName) { }
  public function onDisconnect(ChannelInterface $channel) { }
  public function terminate() { }

  public function onMessage($message, ChannelInterface $channel, $peerName)
  {
    $channel->sendMessage($message . " squared is " . ($message * $message));
  }
}


$wf = new WorkerFactory();
$wp = $wf->createWorkerPool(MyWorkerImpl::class, 4);
$i = 0;
$busy = 0;
foreach ($wp as $w) {
  if ($i >= 10) {
    break;
  }
  $w->sendMessage($i);
  ++$busy;
  ++$i;
}
while ($i < 10) {
  // WorkerPool->receiveMessage takes an output parameter, which it fills
  // with the worker which actually received the returned message
  var_dump($wp->receiveMessage($w));
  $w->sendMessage($i);
  ++$i;
}
while ($busy > 0) {
  var_dump($wp->receiveMessage($w));
  --$busy;
}


$wbsp = new WorkerBootstrapProfile();
$wbsp->setAdminCookie('This value is not so secret, change it in your app !');
$wf = new WorkerFactory($wbsp);
if ($argc > 1 and $argv[1] == '--stop') {
  $wf->stopSharedWorker('unix://' . __DIR__ . '/Worker.sock');
  exit;
}
$w = $wf->connectToSharedWorker('unix://' . __DIR__ . '/Worker.sock', MySharedWorkerImpl::class);
$w->sendMessage(($argc > 1) ? $argv[1] : 'world');
var_dump($w->receiveMessage());


class MySharedWorkerImpl implements SharedWorkerImplementationInterface
{
  private $i;

  public function __construct()
  {
    $this->i = 0;
  }

  public function setLoop(LoopInterface $loop) { }
  public function initialize() { }
  public function onConnect(ChannelInterface $channel, $peerName) { }
  public function onDisconnect(ChannelInterface $channel) { }
  public function onStop() { }
  public function terminate() { }

  public function onQuery($privileged)
  {
    return 'Current counter value : ' . $this->i;
  }

  public function onMessage($message, ChannelInterface $channel, $peerName)
  {
    $channel->sendMessage("Hello " . $message . " ! You're my " . self::ordinal(++$this->i) . " client.");
  }

  private static function ordinal($n)
  {
    $units = $n % 10;
    $tens = (($n % 100) - $units) / 10;
    if ($tens == 1) {
      return $n . 'th';
    } elseif ($units == 1) {
      return $n . 'st';
    } elseif ($units == 2) {
      return $n . 'nd';
    } elseif ($units == 3) {
      return $n . 'rd';
    } else {
      return $n . 'th';
    }
  }
}


$wbsp = new WorkerBootstrapProfile();
$wbsp->setKillSwitchPath(__DIR__ . '/WorkerKillSwitch.json');
$wf = new WorkerFactory($wbsp);
// Save state :
$global = $wf->areSharedWorkersDisabledGlobally();
$addresses = $wf->getDisabledSharedWorkers();
// Disable them globally :
$wf->disableSharedWorkersGlobally();
// Disable the one from our previous example :
$wf->disableSharedWorker('unix://' . __DIR__ . '/Worker.sock');
// "Revert" our changes :
$wf->reEnableSharedWorker('unix://' . __DIR__ . '/Worker.sock');
$wf->reEnableSharedWorkersGlobally();
// Re-enable every single worker :
$wf->reEnableAllSharedWorkers();
// Really revert our changes :
if ($global) {
  $wf->disableSharedWorkersGlobally();
}
// If it fits between foreach and as, disable/reEnableSharedWorker will accept it and will process all of its elements in a single transaction.
$wf->disableSharedWorker($addresses);
Master.php
MyWorkerImpl.php
MyWorkerImpl.php
MyWorkerImpl.php
MasterWithPool.php
MasterOfShared.php
MySharedWorkerImpl.php