PHP code example of aston / distribute-ws

1. Go to this page and download the library: Download aston/distribute-ws library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

aston / distribute-ws example snippets


[
    'user_relate_fd_key' => 'user:relate:fd:%s',//用户ID与分布式FD关联key
    'fd_relate_user_key' => 'fd:relate:user:%s',//分布式FD与用户ID关联key
    'ttl' => 7200,//key的过期时间
    'default_opcode' => WEBSOCKET_OPCODE_BINARY,//默认消息类型 发送时也可传参指定,
    'driver' => QueueDriver::class,// 可选择 Aston\DistributeWs\Driver\QueueDriver::class 异步队列 |  Aston\DistributeWs\Driver\SubscribeDriver::class 发布订阅
    'queue_config' => [
        'process_num' => env('LOCAL_PUSH_PROCESS_NUM', 1),//消费队列进程数量
        'process_concurrent_limit' => env('LOCAL_PUSH_PROCESS_CONCURRENT_LIMIT', 10)//消费队列同时处理消息数
    ],
    'server_id' => env('DISTRIBUTE_SERVER_ID', uniqid()),//服务器ID,分布式部署时保证每台服务器的SERVER_ID不同即可
]


declare(strict_types=1);

namespace App\Controller;

use Aston\DistributeWs\Contract\ISender;
use Aston\DistributeWs\Contract\ISocketClientService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\WebSocketServer\Context;
use Swoole\Http\Request;
use Swoole\Websocket\Frame;

class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
         * @Inject()
         * @var ISocketClientService
         */
    protected ISocketClientService $socketClientService;

    /**
         * @Inject()
         * @var ISender
         */
    protected ISender $sender;

    public function onOpen($server, Request $request): void
    {
        $uid = (int)$request->get['uid'];
        $server->push($request->fd, 'Opened');
        //绑定fd与用户关系
        $this->socketClientService->bindRelation($request->fd, $uid);
    }

    public function onMessage($server, Frame $frame): void
    {
        $data = json_decode($frame->data, true);
        $uid = (int)$data['uid'];
        $text = $data['data'];

        $distribute_fd = $this->socketClientService->findUserFd($uid);
        if (!$distribute_fd) {
            $server->push($frame->fd, 'not exist');
            return;
        }
        //向这个fd单独推送消息
        $distribute_fd->send($text);
        $distribute_fd->send($text, WEBSOCKET_OPCODE_TEXT);
        //向这个uid单独推送消息
        $this->sender->send($uid, $text);
        $this->sender->send($uid, $text, WEBSOCKET_OPCODE_TEXT);
        //向多个用户发送同一条消息
        $this->sender->sendMulti([$uid, (int)Context::get('uid')], $text);
        $this->sender->sendMulti([$uid, (int)Context::get('uid')], $text, WEBSOCKET_OPCODE_TEXT);
        //向所有服务器的所有客户端推送消息
        $this->sender->sendAll($text);
        $this->sender->sendAll($text, WEBSOCKET_OPCODE_TEXT);
    }

    public function onClose($server, int $fd, int $reactorId): void
    {
         // 解除分布式fd与用户绑定关系
         $this->socketClientService->removeRelation($this->socketClientService->genDistributeFd($fd)->toString());
    }
}



php bin/hyperf.php vendor:publish aston/distribute-ws