PHP code example of swlib / archer
1. Go to this page and download the library: Download swlib/archer library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
swlib / archer example snippets
$loader = setPsr4('Swlib\\Archer\\', YOUR_PATH . '/src/');
$loader->addClassMap([
'Swlib\\Archer' => YOUR_PATH . '/src/Archer.php'
]);
\Swoole\Runtime::enableCoroutine();
go(function () {
$callback = function(string $method, ...$param) {
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
return $redis->{$method}(...$param);
};
$task1 = \Swlib\Archer::taskDefer($callback, ['get', 'some_key']);
$task2 = \Swlib\Archer::taskDefer($callback, ['hget', 'a', 'b']);
$task3 = \Swlib\Archer::taskDefer($callback, ['lget', 'k1', 10]);
var_dump($task1->recv());
var_dump($task2->recv());
var_dump($task3->recv());
Archer::taskTimerAfter(1.5, function (string $s1, string $s2) {
echo "1.5s later:{$s1} {$s2}\n";
}, ['hello', 'world']);
});
/*定义*/ \Swlib\Archer::taskDefer(callable $task_callback, ?array $params = null): \Swlib\Archer\Task\Defer;
$task = \Swlib\Archer::taskDefer($task_callback, ['foo', 'bar']);
/*定义*/ \Swlib\Archer\Task\Defer->recv(?float $timeout = null);
$task->recv(0.5);
// $max_concurrent表示集内最大并行数量,缺省表示不限制
$container = \Swlib\Archer::getMultiTask(?int $max_concurrent = null);
$container->addTask(callable $task_callback, ?array $params = null): int;
$container->waitForAll(?float $timeout = null): array;
$container->yieldEachOne(?float $timeout = null): \Generator;
$container->getError(int $id): ?\Throwable;
$container->getErrorMap(): array;
\Swlib\Archer::taskTimerAfter(float $after_time, callable $task_callback, ?array $params = null): int;
$taskid = \Swlib\Archer::taskTimerAfter(1.5, function() { echo 'aaa'; });
\Swlib\Archer::clearTimerTask($taskid); // 返回true为成功,若已执行则返回false
\Swlib\Archer::taskTimerTick(float $tick_time, callable $task_callback, ?array $params = null, ?float $first_time_after = null): int;
$taskid = \Swlib\Archer::taskTimerTick(1.5, function() { echo 'aaa'; });
\Swlib\Archer::clearTimerTask($taskid); // 返回true为成功,若已被清理则返回false
\Swlib\Archer\Task::getCurrentTaskId(): ?int;
\Swlib\Archer\Queue::stop(): void;
\Swlib\Archer\TimerHeap::stop(): void;
\Swlib\Archer\Task::registerTaskFinishFunc(callable $func): void;
function (int $task_id, $task_return_value, ?\Throwable $e) {
// $task_id 为\Swlib\Archer::task()或\Swlib\Archer\MultiTask->addTask() 返回的Task id。\Swlib\Archer::taskWait()由于无法获取Taskid,所以可以忽略该项。
// $task_return_value 为Task闭包 $task_callback 的返回值,若没有返回值或抛出了异常,则该项为null
// $e为Task闭包 $task_callback 中抛出的异常,正常情况下为null
}
\Swlib\Archer\Queue::setQueueSize(int $size): void;
\Swlib\Archer\Queue::setConcurrent(int $concurrent): void;
$task_redis = \Swlib\Archer::taskDefer(function() {
$redis = new \Swoole\Coroutine\Redis();
$redis->connect('127.0.0.1', 6379);
return $redis->get('key');
});
$task_mysql = \Swlib\Archer::taskDefer(function() {
$mysql = new \Swoole\Coroutine\MySQL();
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'user',
'password' => 'pass',
'database' => 'test',
]);
return $mysql->query('select sleep(1)');
});
$task_http = \Swlib\Archer::taskDefer(function(string $url): string {
$httpclient = new \Swoole\Coroutine\Http\Client('0.0.0.0', 9599);
$httpclient->setHeaders(['Host' => "api.mp.qq.com"]);
$httpclient->set(['timeout' => 1]);
$httpclient->get('/');
return $httpclient->body;
}, ['api.mp.qq.com']);
var_dump($task_redis->recv());
var_dump($task_mysql->recv());
var_dump($task_http->recv());
$container = \Swlib\Archer::getMultiTask();
$task_callback = function(int $id): int {
$mysql = new Swoole\Coroutine\MySQL();
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'user',
'password' => 'pass',
'database' => 'test',
]);
$result = $mysql->query('SELECT COUNT(*) AS `c` FROM `order` WHERE `user`='.id);
if (empty($result)) return 0;
return current($result)['c'] ?? 0;
};
$map = [];
$map2 = [];
$results = [];
for ($id=1; $id<=20; ++$id) {// 虽然用 GROUP BY 一条SQL实现,这里只是举个例子
$taskid = $container->addTask($task_callback, [$id]);
$map[$taskid] = $id;
$map2[$id] = $taskid;
}
foreach ($container->waitForAll(10) as $taskid=>$count)
$results[$map[$taskid]] = $count;
for ($id=1; $id<=20; ++$id)
if (array_key_exists($id, $results))
echo "id:{$id} count:{$results[$id]}\n";
else
echo "id:{$id} error:". $container->getError($map2[$id])->getMessage() ."\n";
$container = \Swlib\Archer::getMultiTask();
$task_callback = function(int $id): int {
$mysql = new Swoole\Coroutine\MySQL();
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'user',
'password' => 'pass',
'database' => 'test',
]);
$result = $mysql->query('SELECT COUNT(*) AS `c` FROM `order` WHERE `user`='.id);
if (empty($result)) return 0;
return current($result)['c'] ?? 0;
};
$map = [];
for ($id=1; $id<=20; ++$id) {
$taskid = $container->addTask($task_callback, [$id]);
$map[$taskid] = $id;
}
foreach ($container->yieldEachOne(10) as $taskid=>$count) {
$server->send($map[$taskid], $count); // 假设 fd 和 id 取值一样,这只是一个简化的场景例子,正式应用肯定更复杂
unset($map[$taskid]);
}
foreach ($map as $taskid => $id)
$server->send($id, 'Error: ' . $container->getError($taskid)->getMessage());
\Swlib\Archer::taskTimerTick(5, function(int $limit) {
static $count = 0;
++ $count;
echo "$count\n";
if ($count >= $limit)
\Swlib\Archer::clearTimerTask(\Swlib\Archer\Task::getCurrentTaskId());
}, [8], 2.5);