PHP code example of mojiehai / queue_task
1. Go to this page and download the library: Download mojiehai/queue_task library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
mojiehai / queue_task example snippets
############################## 全局配置 ##############################
$config = [
'log' => [
'logRoot' => __DIR__ . '/../runtime/log', // 日志文件根目录
'fileName' => '\q\u\e\u\e_Y-m-d.\l\o\g', // 日志文件分割规则(date()函数第一个参数)
],
'connectList' => [
'Redis' => [
'class' => '\\QueueTask\\Connection\\Redis\\Redis',
'config' => [ // 应用初始化的参数
'popTimeout' => 3, // pop阻塞的超时时长 s
'host' => '127.0.0.1', // 数据库地址
'port' => 6379, // 数据库端口
'db' => 0, // 库
'password' => null, // 密码
'connTimeout' => 1, // 链接超时
],
],
'Mns' => [
'class' => '\\QueueTask\\Connection\\Mns\\Mns',
'config' => [ // 应用初始化的参数
'popTimeout' => 3, // pop阻塞的超时时长 s
'accessKeyID' => '', // Mns key id
'accessKeySecret' => '', // Mns key secret
'endpoint' => '', // Mns end point
],
],
'RabbitMQ' => [
'class' => '\\QueueTask\\Connection\\RabbitMQ\\RabbitMQ',
'config' => [ // 应用初始化的参数
// exchanges需要设置为direct,持久化存储,不自动确认消息
'popTimeout' => 3, // pop阻塞的超时时长 s
'host' => '127.0.0.1',
'port' => 5672,
'username' => '',
'password' => '',
'vhost' => '/', // 虚拟主机
'exChanges' => '', // 直连交换机名称
],
],
],
'currentConnect' => 'RabbitMQ', // 当前使用的应用类型
];
Load::Queue($config);
############################## 全局配置 ##############################
class TestHandler extends JobHandler
{
/**
* 失败回调方法
* @param Job $job 任务
* @param string $func 执行的方法
* @param array $data 参数
* @return mixed
*/
public function failed(Job $job, $func, $data)
{
\QueueTask\Log\WorkLog::info('failed run handler -- func: '.$func.' -- params: '.json_encode($data));
}
/**
* 任务成功回调
* @param Job $job 任务
* @param string $func 执行的方法
* @param array $data 参数
* @return mixed
*/
public function success(Job $job, $func, $data)
{
\QueueTask\Log\WorkLog::info('success run handler -- func: '.$func.' -- params: '.json_encode($data));
}
public function test(Job $job,$data)
{
\QueueTask\Log\WorkLog::info('run handler -- func: test -- params: '.json_encode($data). '; result : '.var_export($res, true));
}
}
// 获取队列对象
$queue = Queue::getInstance();
// 直接压入队列,参数:handler对象,方法,自定义参数,队列名称
$r = $queue->pushOn(new TestHandler(),'test',['test'=>'test'],'testQueue');
// 延迟5s压入队列(部分队列不支持延迟操作,例如rabbitmq),参数:延迟秒数,handler对象,方法,自定义参数,队列名称
$r = $queue->laterOn(5,new TestHandler(),'test',['test'=>'test'],'testQueue');
$config = [
'queueName' => 'testQueue', //队列名称
'attempt' => 3, //队列任务失败尝试次数,0为不限制
'memory' => 128, //允许使用的最大内存 单位:M
'maxRunTime' => 100, // 最大运行时间 100s 0为不限制(单进程模式建议设置为0,否则需要手动定时拉取)
];
try{
(new Worker($config))->listen();
}catch (Exception $e){
echo $e->getCode()." -- ".$e->getFile() . " -- ". $e->getLine() . " : ".$e->getMessage();
}
$config1 = [
'queueName' => 'testQueue', //队列名称
'attempt' => 3, //队列任务失败尝试次数,0为不限制
'memory' => 128, //允许使用的最大内存 单位:M
'maxRunTime' => 100, // 最大运行时间 100s (多进程模式建议设置为进程重启的间隔时间,例如,需要1个小时重启一次,则设置为3600)
];
$config2 = [
'queueName' => 'testQueue1', //队列名称
'attempt' => 3, //队列任务失败尝试次数,0为不限制
'memory' => 128, //允许使用的最大内存 单位:M
'maxRunTime' => 100, // 最大运行时间 100s
];
try {
(new MultiWorker('tag1')) // tag1为唯一标识,不同任务组使用不同标识
->addWorker($config1, 1) // 第二个参数为进程数
->addWorker($config2, 2)
->start();
} catch (Exception $e) {
}