PHP code example of yurunsoft / swoole-co-pool

1. Go to this page and download the library: Download yurunsoft/swoole-co-pool library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

yurunsoft / swoole-co-pool example snippets


use Yurun\Swoole\CoPool\CoPool;
use Yurun\Swoole\CoPool\Interfaces\ICoTask;
use Yurun\Swoole\CoPool\Interfaces\ITaskParam;

$coCount = 10; // 同时工作协程数
$queueLength = 1024; // 队列长度
$pool = new CoPool($coCount, $queueLength,
    // 定义任务匿名类,当然你也可以定义成普通类,传入完整类名
    new class implements ICoTask
    {
        /**
         * 执行任务
         *
         * @param ITaskParam $param
         * @return mixed
         */
        public function run(ITaskParam $param)
        {
            // 执行任务
            return true; // 返回任务执行结果,非必须
        }

    }
);
$pool->run();

$data = 1; // 可以传递任何参数

// 增加任务,并挂起协程等待返回任务执行结果
$result = $pool->addTask($data);

// 增加任务,异步回调
$result = $pool->addTaskAsync($data, function(ITaskParam $param, $data){
    // 异步回调
});

// 增加分组任务,并挂起协程等待返回任务执行结果
$result = $pool->addTask($data, '分组名称');

// 增加分组任务,异步回调
$result = $pool->addTaskAsync($data, function(ITaskParam $param, $data){
    // 异步回调
}, '分组名称');

$pool->wait(); // 等待协程池停止,不限时,true/false
$pool->wait(60); // 等待协程池停止,限时60秒,如果为-1则不限时,true/false

$batch = new CoBatch([
    function(){
        return 'imi';
    },
    'a' =>  function(){
        return 'niu';
    },
    'b' =>  function(){
        return 'bi';
    },
]);
$results = $batch->exec();
// $timeout = -1; // 支持超时
// $limit = -1; // 限制同时工作协程数量
// $results = $batch->exec($timeout, $limit);
// $results = $batch->exec($timeout, $limit, $throws); // 捕获异常
var_dump($results);
// $results 值为:
// [
//     'imi',
//     'a' =>  'niu',
//     'b' =>  'bi',
// ]
// $throws 值为异常对象数组,成员键名和传入数组中的一致。没有异常则为空数组。
// 超时异常类:Yurun\Swoole\CoPool\Exception\TimeoutException

use function Yurun\Swoole\Coroutine\batch;
batch([
    function(){
        return 'imi';
    },
    'a' =>  function(){
        return 'niu';
    },
    'b' =>  function(){
        return 'bi';
    },
]);
// batch($callables, $timeout, $limit);
// batch($callables, $timeout, $limit, true); // 捕获异常并在当前上下文抛出
// batch($callables, $timeout, $limit, false, $throws); // 捕获异常
// $throws 值为异常对象数组,成员键名和传入数组中的一致。没有异常则为空数组。
// 超时异常类:Yurun\Swoole\CoPool\Exception\TimeoutException

use Yurun\Swoole\CoPool\CoBatchIterator;

$input = function ($size = 100) {
    while ($size--)
    {
        $random = mt_rand(100, 900);
        yield $size => function () use ($random) {
            // 模拟IO任务
            usleep($random * 10);

            return $random;
        };
    }
};

$timeout = -1; // 支持超时
$limit = 8;    // 限制同时工作协程数量
$batch = new CoBatchIterator($input(), $timeout, $limit);
$iter = $batch->exec();

foreach ($iter as $key => $value)
{
    if ($value > 500)
    {
        // 获得符合要求的结果,执行相应的代码逻辑并中断循环
        // 业务代码...
        
        // 可以发送`false`到迭代器中止仍在运行的协程快速回收资源
        $iter->send(false);
        break;
    }
}

// 可以查看批量执行是以什么方式退出的
var_dump($result = $iter->getReturn());
// $result === CoBatchIterator::SUCCESS; // 全部任务完成
// $result === CoBatchIterator::BREAK;   // 执行被主动中断
// $result === CoBatchIterator::TIMEOUT; // 任务超时
// $result === CoBatchIterator::UNKNOWN; // 未知原因(一般情况不会发生)

use function Yurun\Swoole\Coroutine\batchIterator;
batchIterator([
    function(){
        return 'imi';
    },
    'a' =>  function(){
        return 'niu';
    },
    'b' =>  function(){
        return 'bi';
    },
]);
// batchIterator($callables, $timeout, $limit);

use function Yurun\Swoole\Coroutine\goWait;
$result = goWait(function(){
    \Swoole\Coroutine::sleep(1);
    return 'wait result';
});
echo $result; // wait result

// 最大执行时间 0.5 秒,超过时间返回 null,但任务不会中断
$result = goWait(function(){
    \Swoole\Coroutine::sleep(1);
    return 'wait result';
}, 0.5);

// 捕获异常并在当前上下文抛出
try {
    $result = goWait(function(){
        throw new \RuntimeException('gg');
    }, -1, true); // 第 3 个参数传 true
} catch(\Throwable $th) {
    var_dump($th);
}

use Yurun\Swoole\CoPool\ChannelContainer;

go(function(){
    $channelContainer = new ChannelContainer;

    $id = 'abc';
    $data = [
        'time'  =>  time(),
    ];

    go(function() use($id, $data, $channelContainer){
        echo 'Wait 3 seconds...', PHP_EOL;
        \Swoole\Coroutine::sleep(3);
        $channelContainer->push($id, $data);
    });
    var_dump($channelContainer->pop($id));

});