PHP code example of devweyes / queue

1. Go to this page and download the library: Download devweyes/queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

devweyes / queue example snippets



        return [
            Queue::MANAGER => [
                'class' => QueueManager::class,
                'driver' => bean(Queue::DRIVER),
                'serverIdPrefix' => 'swoft_ws_server_cluster_'
            ],
            Queue::DRIVER => [
                'class' => RedisQueue::class,
                'redis' => bean('redis.pool'),
                'serializer' => bean(Queue::SERIALIZER),
                'prefix' => 'swoft_queue_',
                'default' => 'default',
                'waite' => 10,
                'retry' => 3
            ],
            Queue::SERIALIZER => [
                'class' => PhpSerializer::class
            ]
        ];




use Jcsp\Queue\Helper\Tool;
...
'wsServer' => [
    'class' => \Swoft\WebSocket\Server\WebSocketServer::class,
    ...
    //可配置多个消息消费,视业务量而定
    'process' => array_merge(
                Tool::moreProcess('recvMessageProcess', bean(\Jcsp\WsCluster\Process\RecvMessageProcess::class), 3),
                [
                  //自定义进程
                ]
            )
]


use Jcsp\Queue\Queue;

Queue::bind('queue')->push('this is message');

 declare(strict_types=1);

namespace App\Process;

use Jcsp\Queue\Annotation\Mapping\Pull;
use Jcsp\Queue\Result;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Bean\BeanFactory;
use Swoft\Log\Helper\CLog;
use Swoft\Process\Process;
use Jcsp\Queue\Contract\UserProcess;

/**
 * Class MonitorProcess
 *
 * @since 2.0
 *
 * @Bean()
 */
class RecvMessageProcess extends UserProcess
{
    /**
     * @param Process $process
     * @Pull("queue")
     */
    public function run(Process $process): void
    {
        //add queue
        $this->queue = 'new_queue';
        //waite
    }
    /**
     * customer
     * @param $message
     * @return string
     */
    public function receive($message): string
    {
        return Result::ACK;
    }

    /**
     * when error callback
     * @param $message
     * @return string
     */
    public function fallback(\Throwable $throwable, $message): void
    {
        //
        vdump('error', $throwable->getMessage(), 'message',$message);
    }
}

$this->queue