PHP code example of codememory / ws-server-bundle

1. Go to this page and download the library: Download codememory/ws-server-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

codememory / ws-server-bundle example snippets


// config/bundles.php



return [
    ...
    Codememory\WebSocketServerBundle\WebSocketServerBundle::class => ['all' => true]
];


namespace App\WebSocket\EventListeners;

use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;

final class TestHandler implements MessageEventListenerInterface 
{
    public function handle(ServerInterface $server, MessageInterface $message) : void
    {
        // Reply to a message with event "RESPONSE_EVENT"
        $server->sendMessage($message->getSenderConnectionID(), 'RESPONSE_EVENT', [
            'message' => 'Hello World'
        ]);
    }
}

// Don't forget to register this listeners in the bundle configuration



use Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent;
use Predis\Client;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(ConnectionOpenEvent::NAME, 'onOpen')]
readonly class SaveConnectionToRedisEventListener
{
    public function __construct(
        private Client $client
    ) {
    }

    public function onOpen(ConnectionOpenEvent $event): void
    {
        // We save the new connection in the hash table
        $this->client->hset('websocket:connections', $event->connectionID, json_encode([
            'connection_id' => $event->connectionID,
            'websocket_sec_key' => $event->secWebsocketKey
        ]));
    }
}



use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;
use Predis\Client;

readonly class ConnectEventListener implements MessageEventListenerInterface
{
    public function __construct(
        private Client $client
    ) {
    }

    public function handle(ServerInterface $server, MessageInterface $message): void
    {
        $data = $message->getData();
        
        if (array_key_exists('user_id', $data) && is_int($data['user_id'])) {
          // Here we bind the user to the ws connection and save it to a new hash table
          $this->client->hset($this->buildKey($data['user_id']), $message->getSenderConnectionID(), json_encode([
              'timestamp' => time()
          ]));
        }
    }
    
    private function buildKey(int $userId): string
    {
        return "websocket:user:$userId:connections";
    }
}

// Don't worry about registering this EventListener in codememory_ws_server.yaml



use Predis\Client;

final readonly class WebSocketMessageQueueManager
{
    public const HASH_TABLE_NAME = 'websocket:queue:messages';

    public function __construct(
        private Client $client
    ) {
    }

    public function sendMessage(int $userId, string $event, array $data): void
    {
        // We get all ws connections by user ID
        $connections = $this->client->hgetall("websocket:user:$userId:connections");

        foreach ($connections as $id => $userConnectionData) {
            // Receiving information about the connection by connection identifier
            $connection = $this->client->hget('websocket:connections', $id);

            if (null !== $connection) {
                $connectionData = json_decode($connection, true);

                // We save the message in a hash table, as the key we indicate the connection ID to which we need to send and its websocket-sec-key (to ensure security)
                $this->client->hset(
                    self::HASH_TABLE_NAME,
                    $this->buildMessageField($id, $connectionData['websocket_sec_key']),
                    json_encode([
                      'event' => $event,
                      'data' => $data
                    ])
                );
            }
        }
    }
    
    private function buildMessageField(int $connectionId, string $webSocketSecKey): string
    {
        return "{$connectionId}_{$webSocketSecKey}";
    }
}



use App\Services\WebSocketMessageQueueManager;
use Codememory\WebSocketServerBundle\Event\StartServerEvent;
use Predis\Client;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Throwable;

#[AsEventListener(StartServerEvent::NAME, 'onStart')]
final readonly class ProcessForSendingMessagesFromQueueEventListener
{
    public function __construct(
        private Client $client,
        private LoggerInterface $logger
    ) {
    }

    public function onStart(StartServerEvent $event): void
    {
        try {
            $event->server->addProcess(function () use ($event) {
                // Receive all messages from the queue
                $messages = $this->client->hgetall(WebSocketMessageQueueManager::HASH_TABLE_NAME);
                
                foreach ($messages as $for => $message) {
                    [$connectionID, $webSocketSecKey] = explode('_', $for);
                    
                    // We check that the message that was added to the queue belongs to the same connection that is connected
                    if ($this->connectionCheck($connectionID, $webSocketSecKey)) {
                        $message = json_decode($message, true);

                        $event->server->sendMessage($connectionID, $message['event'], $message['data']);
                        
                        // We remove the message from the queue so that it is not sent again
                        $this->client->hdel(WebSocketMessageQueueManager::HASH_TABLE_NAME, [$for]);
                    }
                }
            });
        } catch (Throwable $e) {
            $this->logger->critical($e, [
                'origin' => self::class,
                'detail' => 'An error occurred while adding a process to send messages from a queue.'
            ]);
        }
    }

    private function connectionCheck(int $connectionID, string $webSocketSecKey): bool
    {
        $connection = $this->client->hget('websocket:connections', $connectionID);

        if (null !== $connection) {
            $connectionData = json_decode($connection, true);

            if ($connectionData['websocket_sec_key'] === $webSocketSecKey) {
                return true;
            }
        }

        return false;
    }
}