1. Go to this page and download the library: Download codememory/ws-server-bundle library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
namespace App\WebSocket\EventListeners;
use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;
final class TestHandler implements MessageEventListenerInterface
{
public function handle(ServerInterface $server, MessageInterface $message) : void
{
// Reply to a message with event "RESPONSE_EVENT"
$server->sendMessage($message->getSenderConnectionID(), 'RESPONSE_EVENT', [
'message' => 'Hello World'
]);
}
}
// Don't forget to register this listeners in the bundle configuration
use Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent;
use Predis\Client;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener(ConnectionOpenEvent::NAME, 'onOpen')]
readonly class SaveConnectionToRedisEventListener
{
public function __construct(
private Client $client
) {
}
public function onOpen(ConnectionOpenEvent $event): void
{
// We save the new connection in the hash table
$this->client->hset('websocket:connections', $event->connectionID, json_encode([
'connection_id' => $event->connectionID,
'websocket_sec_key' => $event->secWebsocketKey
]));
}
}
use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;
use Predis\Client;
readonly class ConnectEventListener implements MessageEventListenerInterface
{
public function __construct(
private Client $client
) {
}
public function handle(ServerInterface $server, MessageInterface $message): void
{
$data = $message->getData();
if (array_key_exists('user_id', $data) && is_int($data['user_id'])) {
// Here we bind the user to the ws connection and save it to a new hash table
$this->client->hset($this->buildKey($data['user_id']), $message->getSenderConnectionID(), json_encode([
'timestamp' => time()
]));
}
}
private function buildKey(int $userId): string
{
return "websocket:user:$userId:connections";
}
}
// Don't worry about registering this EventListener in codememory_ws_server.yaml
use Predis\Client;
final readonly class WebSocketMessageQueueManager
{
public const HASH_TABLE_NAME = 'websocket:queue:messages';
public function __construct(
private Client $client
) {
}
public function sendMessage(int $userId, string $event, array $data): void
{
// We get all ws connections by user ID
$connections = $this->client->hgetall("websocket:user:$userId:connections");
foreach ($connections as $id => $userConnectionData) {
// Receiving information about the connection by connection identifier
$connection = $this->client->hget('websocket:connections', $id);
if (null !== $connection) {
$connectionData = json_decode($connection, true);
// We save the message in a hash table, as the key we indicate the connection ID to which we need to send and its websocket-sec-key (to ensure security)
$this->client->hset(
self::HASH_TABLE_NAME,
$this->buildMessageField($id, $connectionData['websocket_sec_key']),
json_encode([
'event' => $event,
'data' => $data
])
);
}
}
}
private function buildMessageField(int $connectionId, string $webSocketSecKey): string
{
return "{$connectionId}_{$webSocketSecKey}";
}
}
use App\Services\WebSocketMessageQueueManager;
use Codememory\WebSocketServerBundle\Event\StartServerEvent;
use Predis\Client;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Throwable;
#[AsEventListener(StartServerEvent::NAME, 'onStart')]
final readonly class ProcessForSendingMessagesFromQueueEventListener
{
public function __construct(
private Client $client,
private LoggerInterface $logger
) {
}
public function onStart(StartServerEvent $event): void
{
try {
$event->server->addProcess(function () use ($event) {
// Receive all messages from the queue
$messages = $this->client->hgetall(WebSocketMessageQueueManager::HASH_TABLE_NAME);
foreach ($messages as $for => $message) {
[$connectionID, $webSocketSecKey] = explode('_', $for);
// We check that the message that was added to the queue belongs to the same connection that is connected
if ($this->connectionCheck($connectionID, $webSocketSecKey)) {
$message = json_decode($message, true);
$event->server->sendMessage($connectionID, $message['event'], $message['data']);
// We remove the message from the queue so that it is not sent again
$this->client->hdel(WebSocketMessageQueueManager::HASH_TABLE_NAME, [$for]);
}
}
});
} catch (Throwable $e) {
$this->logger->critical($e, [
'origin' => self::class,
'detail' => 'An error occurred while adding a process to send messages from a queue.'
]);
}
}
private function connectionCheck(int $connectionID, string $webSocketSecKey): bool
{
$connection = $this->client->hget('websocket:connections', $connectionID);
if (null !== $connection) {
$connectionData = json_decode($connection, true);
if ($connectionData['websocket_sec_key'] === $webSocketSecKey) {
return true;
}
}
return false;
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.