1. Go to this page and download the library: Download qxsch/worker-pool library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
qxsch / worker-pool example snippets
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
->create(new \QXS\WorkerPool\ClosureWorker(
/**
* @param mixed $input the input from the WorkerPool::run() Method
* @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
* @param \ArrayObject $storage a persistent storage for the current child process
*/
function($input, $semaphore, $storage) {
echo "[".getmypid()."]"." hi $input\n";
sleep(rand(1,3)); // this is the working load!
return $input; // return null here, in case you do not want to pass any data to the parent
}
)
);
for($i=0; $i<10; $i++) {
$wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
echo $val->dump() . "\n"; // dump the returned values
// var_dump($val); // dump the returned values
}
use QXS\WorkerPool\WorkerPool;
use QXS\WorkerPool\WorkerInterface;
use QXS\WorkerPool\Semaphore;
/**
* Our Worker Class
*/
Class MyWorker implements WorkerInterface {
protected $sem;
/**
* after the worker has been forked into another process
*
* @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks
* @throws \Exception in case of a processing Error an Exception will be thrown
*/
public function onProcessCreate(Semaphore $semaphore) {
// semaphore can be used in the run method to synchronize the workers
$this->sem=$semaphore;
// write something to the stdout
echo "\t[".getmypid()."] has been created.\n";
// initialize mt_rand
list($usec, $sec) = explode(' ', microtime());
mt_srand((int)( (float) $sec + ((float) $usec * 100000) ));
}
/**
* before the worker process is getting destroyed
*
* @throws \Exception in case of a processing Error an Exception will be thrown
*/
public function onProcessDestroy() {
// write something to the stdout
echo "\t[".getmypid()."] will be destroyed.\n";
}
/**
* run the work
*
* @param Serializeable $input the data, that the worker should process
* @return Serializeable Returns the result
* @throws \Exception in case of a processing Error an Exception will be thrown
*/
public function run($input) {
$input=(string)$input;
echo "\t[".getmypid()."] Hi $input\n";
sleep(mt_rand(0,10)); // this is the workload!
// and sometimes exceptions might occur
if(mt_rand(0,10)==9) {
throw new \RuntimeException('We have a problem for '.$input.'.');
}
return "Hi $input"; // return null here, in case you do not want to pass any data to the parent
}
}
$wp=new WorkerPool();
$wp->setWorkerPoolSize(10)
->create(new MyWorker());
// produce some tasks
for($i=1; $i<=50; $i++) {
$wp->run($i);
}
// some statistics
echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n";
// wait for completion of all tasks
$wp->waitForAllWorkers();
// collect all the results
foreach($wp as $val) {
if(isset($val['data'])) {
echo "RESULT: ".$val['data']."\n";
}
elseif(isset($val['workerException'])) {
echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n";
}
elseif(isset($val['poolException'])) {
echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n";
}
}
// write something, before the parent exits
echo "ByeBye\n";
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
->create(new \QXS\WorkerPool\ClosureWorker(
/**
* @param mixed $input the input from the WorkerPool::run() Method
* @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
* @param \ArrayObject $storage a persistent storge for the current child process
*/
function($input, $semaphore, $storage) {
$semaphore->synchronizedBegin();
try {
// this code is being synchronized accross all workers
// so here we have just one worker at a time
echo "[A][".getmypid()."]"." hi $input\n";
}
finally {
$semaphore->synchronizedEnd();
}
// alternative example
$semaphore->synchronize(function() use ($input, $storage) {
// this code is being synchronized accross all workers
// so here we have just one worker at a time
echo "[B][".getmypid()."]"." hi $input\n";
});
sleep(rand(1,3)); // this is the working load!
return $input;
}
)
);
for($i=0; $i<10; $i++) {
$wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
var_dump($val); // dump the returned values
}
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
->disableSemaphore() // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect)
->create(new \QXS\WorkerPool\ClosureWorker(
/**
* @param mixed $input the input from the WorkerPool::run() Method
* @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
* @param \ArrayObject $storage a persistent storage for the current child process
*/
function($input, $semaphore, $storage) {
echo "[".getmypid()."]"." hi $input\n";
sleep(rand(1,3)); // this is the working load!
return $input; // return null here, in case you do not want to pass any data to the parent
}
)
);
for($i=0; $i<10; $i++) {
$wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
var_dump($val); // dump the returned values
}
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
->respawnAutomatically()
->create(new \QXS\WorkerPool\ClosureWorker(
/**
* @param mixed $input the input from the WorkerPool::run() Method
* @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
* @param \ArrayObject $storage a persistent storage for the current child process
*/
function($input, $semaphore, $storage) {
echo "[".getmypid()."]"." hi $input\n";
sleep(rand(1,3)); // this is the working load!
// Simulate unexpected worker death
if (rand(1, 10) > 5) exit;
return $input; // return null here, in case you do not want to pass any data to the parent
}
)
);
for($i=0; $i<10; $i++) {
$wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
var_dump($val); // dump the returned values
}