PHP code example of jeruier / swozr-taskr

1. Go to this page and download the library: Download jeruier/swozr-taskr library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

jeruier / swozr-taskr example snippets


    'task' => [
        'class' => \Swozr\Taskr\Server\TaskrEngine::class,
        'host' => '0.0.0.0',
        'port' => 9501,
        'MQProcessMum' => 3,
        'setting' => [
            'task_worker_num' => 12,
            ...
        ],
        'listener' => [
            ...
        ],
        'exceptionHandler' => [
            ...
        ],
        'crontabs' => [
            ...
        ],
        'rabbmitMqs' => [
            ...
        ],
    ],

    $config = [
        'host' => '0.0.0.0',
        'port' => 9501,
        'MQProcessMum' => 3,
        'setting' => [
             'task_worker_num' => 12,
            ...
        ],
        'listener' => [
            ...
        ],
        'exceptionHandler' => [
            ...
        ],
        'crontabs' => [
            ...
        ],
        'rabbmitMqs' => [
            ...
        ],
   ];

    'listener' => [
        ServerEvent::BEFORE_ADDED_EVENT => function(Event $event){
            //触发事件后处理
            $target = $event->getTarget(); //事件触发所传递
            $param = $event->getParam('key'); //获取指定参数
            $params = $event->getParams();  //获取所有参数
        },
        ...
    ],

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => [
                'EventHandlerController@action',
                'EventHandlerController@staticAction',
                ...
           ],
           ...
       ], 

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => 'EventHandlerController',
           ...
       ], 

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => 'EventHandlerController',
           ...
       ], 

    'exceptionHandler' => [
        'SwozrException' => 'SwozrExceptionHandler',
        'ServerException' => 'ServerExceptionHandler',
        ...
    ]

*    *    *    *    *    *
-    -    -    -    -    -
|    |    |    |    |    |
|    |    |    |    |    +----- day of week (0 - 6) (Sunday=0)
|    |    |    |    +----- month (1 - 12)
|    |    |    +------- day of month (1 - 31)
|    |    +--------- hour (0 - 23)
|    +----------- min (0 - 59)
+------------- sec (0-59)

    'crontabs' => [
        CrontabTaskHandle::class,
        ...
    ]

    'rabbmitMqs' => [
        'class' => MqTaskHandleOne::class,
        'host' => '192.168.99.100',
        'port' => 5672
        'username' => 'guest',
        'password' => 'guest',
        'exchange_name' => 'exchange1',
        'queue_name' => 'queue1',
        'routing_key' => 'routing1',
    ],

    'rabbmitMqs' => [
        [        
            'class' => MqTaskHandleOne::class,
             'host' => '192.168.99.100',
             'port' => 5672
             'username' => 'guest',
             'password' => 'guest',
             'exchange_name' => 'exchange1',
             'queue_name' => 'queue1',
             'routing_key' => 'routing1',
         ],
         [        
             'class' => MqTaskHandleTwo::class,
              'host' => '192.168.99.100',
              'port' => 5672
              'username' => 'guest',
              'password' => 'guest',
              'exchange_name' => 'exchange1',
              'queue_name' => 'queue2',
              'routing_key' => 'routing2',
          ],
          ...
    ],

use Swozr\Taskr\Server\Base\BaseTask;
use Swozr\Taskr\Server\Contract\TaskConsume;
use Swozr\Taskr\Server\Contract\TaskNotice;

class TaskHandleTest extends BaseTask implements TaskNotice,TaskConsume
{
    /**
     * 任务投递失败
     * @param array $data 投递的数据
     * @param int $delay 延迟执行毫秒数
     * @param string $failMsg 任务发布失败原因
     * @return mixed
     */
    public static function pushFailure(array $data, int $delay, string $failMsg)
    {
        // TODO: Implement pushFailure() method.
    }

    /**
     *任务已投递
     * @return mixed
     */
    public function pushed()
    {
        // TODO: Implement pushed() method.
    }

    /**
     * 消费任务
     * @return mixed
     */
    public function consume(): string
    {
        // TODO: Implement consume() method.
    }

    /**
     * 任务完成
     * @return mixed
     */
    public function finished()
    {
        // TODO: Implement finished() method.
    }
}

    public static $cron = '0/3 * * * * *';

    $data = [
        'param1' => 'val1',
        'param2' => 'val2',
    ];
    //使用默认地址0.0.0.0端口为9501
    TaskTest::publish($data);  //异步任务
    TaskTest::publish($data, 5000);  //延迟任务
    
    //自定义Taskr client投递任务(自定义地址、端口)
    $taskrClientObj = TaskrClient::getInstance([
        'host' => '0.0.0.0',
        'port' => 9501,
        'timeout' => 1
     ]);
     TaskTest::publish($data, $taskrClientObj);  //异步任务
     TaskTest::publish($data, $taskrClientObj, 5000);  //延迟任务

    $taskrClientObj = TaskrClient::getInstance([
        'host' => '0.0.0.0',
        'port' => 9501,
        'timeout' => 1
     ]);

    $taskrClientObj = TaskrClient::getInstance();
    $taskrClientObj->setHost('0.0.0.0');
    $taskrClientObj->setPort(9501);
    $taskrClientObj->setTimeout(1);

return [
    // ...
    'components' => [
        // ...
        'taskr' => [
            [
                'class' => \Swozr\Taskr\Server\TaskrEngine::class,
                'host' => '0.0.0.0',
                'port' => '9501',
                'pidName' => 'swozr-taskr',
                'MQProcessMum' => 1,
                'debug' => true,
                'setting' => [
                    'worker_num' => 1,
                    'task_worker_num' => 2,
                    'daemonize' => false
                ],
                'exceptionHandler' => [
                    \Swozr\Taskr\Server\Exception\TaskException::class => \SwozrTest\Taskr\Server\ExceptionHandler\TaskExceptionHandler::class
                ],
                'crontabs' => [
                    \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class
                ],
                'listener' => [
                    \Swozr\Taskr\Server\Event\ServerEvent::TASK_PUSHED => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class,
                    \Swozr\Taskr\Server\Event\ServerEvent::TASK_CONSUME => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class
                ],
                'rabbmitMqs' => [
                    'class' => \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class,
                    'host' => '192.168.99.100',
                    'exchange_name' => 'a',
                    'queue_name' => 'a',
                    'routing_key' => 'c',
                ],
            ]
        ],
    ],
    // ...
];

namespace app\commands;

use yii\console\Controller;

class TaskrController extends Controller
{
    private $taskr;
    
    public function init()
    {
        parent::init(); // TODO: Change the autogenerated stub
        $this->taskr = Yii::$app->taskr;
    }
    
    /**
     * start taskr server
     */
    public function actionStart()
    {
        $this->taskr->start();
    }
    
    /**
     * stop taskr server
     */
    public function actionStop()
    {
        $this->taskr->stop();
    }
    
    /**
     * taskr server status
     */
    public function actionStatus()
    {
        $this->taskr->status();
    }
  
    /**
     * @param $type 当为-o则只reload task worker进程,默认重启所有
     * reload taskr server
     */    
    public function actionReload($type = '')
    {
        $onlyTaskWorker = '-o' == $type ? true : false;
        $this->taskr->reload();
    }  
    
    /**
     * restart taskr server
     */
    public function actionRestart()
    {
        $this->taskr->restart();
    }  
}

namespace app\commands;

use yii\console\Controller;
use Swozr\Taskr\Server\TaskrEngine;

class TaskrController extends Controller
{
    private $taskr;
    
    public function init()
    {
        parent::init(); // TODO: Change the autogenerated stub
        $this->taskr = new TaskrEngine([
            'host' => '0.0.0.0',
            'port' => '9501',
            'pidName' => 'swozr-taskr',
            'MQProcessMum' => 1,
            'debug' => true,
            'setting' => [
                'worker_num' => 1,
                'task_worker_num' => 2,
                'daemonize' => false
            ],
            'exceptionHandler' => [
                \Swozr\Taskr\Server\Exception\TaskException::class => \SwozrTest\Taskr\Server\ExceptionHandler\TaskExceptionHandler::class
            ],
            'crontabs' => [
                \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class
            ],
            'listener' => [
                \Swozr\Taskr\Server\Event\ServerEvent::TASK_PUSHED => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class,
                \Swozr\Taskr\Server\Event\ServerEvent::TASK_CONSUME => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class
            ],
            'rabbmitMqs' => [
                'class' => \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class,
                'host' => '192.168.99.100',
                'exchange_name' => 'a',
                'queue_name' => 'a',
                'routing_key' => 'c',
            ],
        ]);
    }
    
    /**
     * start taskr server
     */
    public function actionStart()
    {
        $this->taskr->start();
    }
    
    /**
     * stop taskr server
     */
    public function actionStop()
    {
        $this->taskr->stop();
    }
    
    /**
     * taskr server status
     */
    public function actionStatus()
    {
        $this->taskr->status();
    }
  
    /**
     * @param $type 当为-o则只reload task worker进程,默认重启所有
     * reload taskr server
     */    
    public function actionReload($type = '')
    {
        $onlyTaskWorker = '-o' == $type ? true : false;
        $this->taskr->reload();
    }  
    
    /**
     * restart taskr server
     */
    public function actionRestart()
    {
        $this->taskr->restart();
    }  
}
bash
    php yii taskr/start
bash
    php yii taskr/stop
bash
    php yii taskr/status
bash
    php yii taskr/reload
    
    //只reload task worker进程
    php yii taskr/reload -o
bash
    php yii taskr/restart