PHP code example of cesurapp / swoole-bundle

1. Go to this page and download the library: Download cesurapp/swoole-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

cesurapp / swoole-bundle example snippets


...
uire_once dirname(__DIR__).'/vendor/autoload_runtime.php';
...



namespace App\Cron;

use Cesurapp\SwooleBundle\Cron\AbstractCronJob;

/**
 * Predefined Scheduling
 *
 * '@yearly'           => '0 0 1 1 *',
 * '@annually'         => '0 0 1 1 *',
 * '@monthly'          => '0 0 1 * *',
 * '@weekly'           => '0 0 * * 0',
 * '@daily'            => '0 0 * * *',
 * '@hourly'           => '0 * * * *',
 * '@EveryMinute'      => '* * * * *',
 * '@EveryMinute5'     => '*/5 * * * *',
 * '@EveryMinute10'    => '*/10 * * * *',
 * '@EveryMinute15'    => '*/15 * * * *',
 * '@EveryMinute30'    => '*/30 * * * *',
 */
class ExampleCron extends AbstractCronJob
{
    public string $TIME = '@EveryMinute10';
    public bool $ENABLE = true;

    public function __invoke(): void
    {
        // Cron job logic here
    }
}



namespace App\Task;

use Cesurapp\SwooleBundle\Task\TaskInterface;

class ExampleTask implements TaskInterface
{
    public function __invoke(string $data): mixed
    {
        $payload = unserialize($data);

        var_dump(
            $payload['name'],
            $payload['invoke']
        );

        return 'Task completed';
    }
}



namespace App\Controller;

use App\Task\ExampleTask;
use Cesurapp\SwooleBundle\Task\TaskHandler;
use Symfony\Component\HttpFoundation\Response;

class ExampleController
{
    public function __construct(
        private readonly TaskHandler $taskHandler
    ) {}

    public function hello(): Response
    {
        $this->taskHandler->dispatch(ExampleTask::class, [
            'name' => 'Test',
            'invoke' => 'Data'
        ]);

        return new Response('Task dispatched');
    }
}



namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;

class RedisListenerProcess extends AbstractProcessJob
{
    // Is process active?
    public bool $ENABLE = true;
    
    // Restart when process completes
    public bool $RESTART = true;
    
    // Wait time before restart (seconds)
    public int $RESTART_DELAY = 5;

    public function __construct(
        private readonly RedisClient $redis,
        private readonly LoggerInterface $logger
    ) {
    }

    public function __invoke(): void
    {
        $this->logger->info('Redis listener started');
        
        // Redis SUBSCRIBE command
        $this->redis->subscribe(['channel1', 'channel2'], function ($redis, $channel, $message) {
            $this->logger->info("Received message from {$channel}: {$message}");
            // Process here
        });
    }
}



namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;
use Doctrine\DBAL\Connection;

class PostgresListenerProcess extends AbstractProcessJob
{
    public bool $ENABLE = true;
    public bool $RESTART = true;
    public int $RESTART_DELAY = 3;

    public function __construct(
        private readonly Connection $connection,
        private readonly LoggerInterface $logger
    ) {
    }

    public function __invoke(): void
    {
        $this->logger->info('Postgres listener started');
        
        // LISTEN command
        $this->connection->executeStatement('LISTEN my_channel');
        
        while (true) {
            // Wait for notification
            $notification = pg_get_notify($this->connection->getNativeConnection());
            
            if ($notification) {
                $this->logger->info('Received notification', [
                    'channel' => $notification['message'],
                    'payload' => $notification['payload']
                ]);
                
                // Process here
            }
            
            usleep(100000); // Wait 100ms
        }
    }
}



namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;

class OneTimeProcess extends AbstractProcessJob
{
    public bool $ENABLE = true;
    public bool $RESTART = false; // Restart disabled

    public function __invoke(): void
    {
        // One-time operation
        $this->doSomething();
        
        // Process terminates when completed
    }
}



namespace App\WebSocket;

use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class MyWebSocketHandler
{
    public function initServerEvents(Server $server): void
    {
        // WebSocket connection opened
        $server->on('open', function (Server $server, $request) {
            echo "Connection opened: {$request->fd}\n";
            $server->push($request->fd, json_encode([
                'type' => 'connected',
                'message' => 'Welcome to WebSocket server'
            ]));
        });

        // WebSocket message received
        $server->on('message', function (Server $server, Frame $frame) {
            echo "Received message from {$frame->fd}: {$frame->data}\n";
            
            // Echo back to sender
            $server->push($frame->fd, "Server received: {$frame->data}");
            
            // Broadcast to all connections
            foreach ($server->connections as $fd) {
                if ($server->isEstablished($fd)) {
                    $server->push($fd, "Broadcast: {$frame->data}");
                }
            }
        });

        // WebSocket connection closed
        $server->on('close', function (Server $server, int $fd) {
            echo "Connection closed: {$fd}\n";
        });
    }
}



namespace App\WebSocket;

use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Psr\Log\LoggerInterface;
use App\Service\ChatService;

class ChatWebSocketHandler
{
    public function __construct(
        private readonly LoggerInterface $logger,
        private readonly ChatService $chatService
    ) {
    }

    public function initServerEvents(Server $server): void
    {
        $server->on('open', function (Server $server, $request) {
            $this->logger->info('WebSocket connection opened', ['fd' => $request->fd]);
            
            // Authenticate user from request headers/cookies
            $token = $request->header['authorization'] ?? null;
            $user = $this->chatService->authenticateToken($token);
            
            if ($user) {
                $server->push($request->fd, json_encode([
                    'type' => 'auth_success',
                    'user' => $user
                ]));
            } else {
                $server->disconnect($request->fd);
            }
        });

        $server->on('message', function (Server $server, Frame $frame) {
            $this->logger->info('WebSocket message received', [
                'fd' => $frame->fd,
                'data' => $frame->data
            ]);
            
            $data = json_decode($frame->data, true);
            
            match ($data['type'] ?? null) {
                'chat_message' => $this->handleChatMessage($server, $frame->fd, $data),
                'typing' => $this->handleTyping($server, $frame->fd, $data),
                'ping' => $server->push($frame->fd, json_encode(['type' => 'pong'])),
                default => $this->logger->warning('Unknown message type', ['data' => $data])
            };
        });

        $server->on('close', function (Server $server, int $fd) {
            $this->logger->info('WebSocket connection closed', ['fd' => $fd]);
            $this->chatService->handleDisconnect($fd);
        });
    }

    private function handleChatMessage(Server $server, int $fd, array $data): void
    {
        $message = $this->chatService->saveMessage($fd, $data['message']);
        
        // Broadcast to room members
        foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
            if ($server->isEstablished($memberId)) {
                $server->push($memberId, json_encode([
                    'type' => 'new_message',
                    'message' => $message
                ]));
            }
        }
    }

    private function handleTyping(Server $server, int $fd, array $data): void
    {
        // Notify room members about typing status
        foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
            if ($memberId !== $fd && $server->isEstablished($memberId)) {
                $server->push($memberId, json_encode([
                    'type' => 'user_typing',
                    'user_id' => $data['user_id']
                ]));
            }
        }
    }
}

// Send message to specific connection
$server->push(int $fd, string $data, int $opcode = WEBSOCKET_OPCODE_TEXT): bool

// Check if connection is valid WebSocket connection
$server->isEstablished(int $fd): bool

// Disconnect connection
$server->disconnect(int $fd, int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL, string $reason = ''): bool

// Check if connection exists
$server->exist(int $fd): bool

// Get all connection IDs
$server->connections: Iterator

// Get connection info
$server->getClientInfo(int $fd): array|false



namespace App\Cron;

use Cesurapp\SwooleBundle\Cron\AbstractCronJob;

class MetricsCollectorCron extends AbstractCronJob
{
    // Timer interval in SECONDS (numeric value instead of cron expression)
    public string $TIME = '30';  // Runs every 30 seconds
    public bool $ENABLE = true;

    public function __construct(
        private readonly MetricsService $metrics
    ) {}

    public function __invoke(): void
    {
        $this->metrics->collect();
    }
}