PHP code example of aston / distribute-ws
1. Go to this page and download the library: Download aston/distribute-ws library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
aston / distribute-ws example snippets
[
'user_relate_fd_key' => 'user:relate:fd:%s',//用户ID与分布式FD关联key
'fd_relate_user_key' => 'fd:relate:user:%s',//分布式FD与用户ID关联key
'ttl' => 7200,//key的过期时间
'default_opcode' => WEBSOCKET_OPCODE_BINARY,//默认消息类型 发送时也可传参指定,
'driver' => QueueDriver::class,// 可选择 Aston\DistributeWs\Driver\QueueDriver::class 异步队列 | Aston\DistributeWs\Driver\SubscribeDriver::class 发布订阅
'queue_config' => [
'process_num' => env('LOCAL_PUSH_PROCESS_NUM', 1),//消费队列进程数量
'process_concurrent_limit' => env('LOCAL_PUSH_PROCESS_CONCURRENT_LIMIT', 10)//消费队列同时处理消息数
],
'server_id' => env('DISTRIBUTE_SERVER_ID', uniqid()),//服务器ID,分布式部署时保证每台服务器的SERVER_ID不同即可
]
declare(strict_types=1);
namespace App\Controller;
use Aston\DistributeWs\Contract\ISender;
use Aston\DistributeWs\Contract\ISocketClientService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\WebSocketServer\Context;
use Swoole\Http\Request;
use Swoole\Websocket\Frame;
class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
/**
* @Inject()
* @var ISocketClientService
*/
protected ISocketClientService $socketClientService;
/**
* @Inject()
* @var ISender
*/
protected ISender $sender;
public function onOpen($server, Request $request): void
{
$uid = (int)$request->get['uid'];
$server->push($request->fd, 'Opened');
//绑定fd与用户关系
$this->socketClientService->bindRelation($request->fd, $uid);
}
public function onMessage($server, Frame $frame): void
{
$data = json_decode($frame->data, true);
$uid = (int)$data['uid'];
$text = $data['data'];
$distribute_fd = $this->socketClientService->findUserFd($uid);
if (!$distribute_fd) {
$server->push($frame->fd, 'not exist');
return;
}
//向这个fd单独推送消息
$distribute_fd->send($text);
$distribute_fd->send($text, WEBSOCKET_OPCODE_TEXT);
//向这个uid单独推送消息
$this->sender->send($uid, $text);
$this->sender->send($uid, $text, WEBSOCKET_OPCODE_TEXT);
//向多个用户发送同一条消息
$this->sender->sendMulti([$uid, (int)Context::get('uid')], $text);
$this->sender->sendMulti([$uid, (int)Context::get('uid')], $text, WEBSOCKET_OPCODE_TEXT);
//向所有服务器的所有客户端推送消息
$this->sender->sendAll($text);
$this->sender->sendAll($text, WEBSOCKET_OPCODE_TEXT);
}
public function onClose($server, int $fd, int $reactorId): void
{
// 解除分布式fd与用户绑定关系
$this->socketClientService->removeRelation($this->socketClientService->genDistributeFd($fd)->toString());
}
}
php bin/hyperf.php vendor:publish aston/distribute-ws