1. Go to this page and download the library: Download cesurapp/swoole-bundle library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
namespace App\Task;
use Cesurapp\SwooleBundle\Task\TaskInterface;
class ExampleTask implements TaskInterface
{
public function __invoke(string $data): mixed
{
$payload = unserialize($data);
var_dump(
$payload['name'],
$payload['invoke']
);
return 'Task completed';
}
}
namespace App\Controller;
use App\Task\ExampleTask;
use Cesurapp\SwooleBundle\Task\TaskHandler;
use Symfony\Component\HttpFoundation\Response;
class ExampleController
{
public function __construct(
private readonly TaskHandler $taskHandler
) {}
public function hello(): Response
{
$this->taskHandler->dispatch(ExampleTask::class, [
'name' => 'Test',
'invoke' => 'Data'
]);
return new Response('Task dispatched');
}
}
namespace App\Process;
use Cesurapp\SwooleBundle\Process\AbstractProcessJob;
class RedisListenerProcess extends AbstractProcessJob
{
// Is process active?
public bool $ENABLE = true;
// Restart when process completes
public bool $RESTART = true;
// Wait time before restart (seconds)
public int $RESTART_DELAY = 5;
public function __construct(
private readonly RedisClient $redis,
private readonly LoggerInterface $logger
) {
}
public function __invoke(): void
{
$this->logger->info('Redis listener started');
// Redis SUBSCRIBE command
$this->redis->subscribe(['channel1', 'channel2'], function ($redis, $channel, $message) {
$this->logger->info("Received message from {$channel}: {$message}");
// Process here
});
}
}
namespace App\Process;
use Cesurapp\SwooleBundle\Process\AbstractProcessJob;
use Doctrine\DBAL\Connection;
class PostgresListenerProcess extends AbstractProcessJob
{
public bool $ENABLE = true;
public bool $RESTART = true;
public int $RESTART_DELAY = 3;
public function __construct(
private readonly Connection $connection,
private readonly LoggerInterface $logger
) {
}
public function __invoke(): void
{
$this->logger->info('Postgres listener started');
// LISTEN command
$this->connection->executeStatement('LISTEN my_channel');
while (true) {
// Wait for notification
$notification = pg_get_notify($this->connection->getNativeConnection());
if ($notification) {
$this->logger->info('Received notification', [
'channel' => $notification['message'],
'payload' => $notification['payload']
]);
// Process here
}
usleep(100000); // Wait 100ms
}
}
}
namespace App\Process;
use Cesurapp\SwooleBundle\Process\AbstractProcessJob;
class OneTimeProcess extends AbstractProcessJob
{
public bool $ENABLE = true;
public bool $RESTART = false; // Restart disabled
public function __invoke(): void
{
// One-time operation
$this->doSomething();
// Process terminates when completed
}
}
namespace App\WebSocket;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
class MyWebSocketHandler
{
public function initServerEvents(Server $server): void
{
// WebSocket connection opened
$server->on('open', function (Server $server, $request) {
echo "Connection opened: {$request->fd}\n";
$server->push($request->fd, json_encode([
'type' => 'connected',
'message' => 'Welcome to WebSocket server'
]));
});
// WebSocket message received
$server->on('message', function (Server $server, Frame $frame) {
echo "Received message from {$frame->fd}: {$frame->data}\n";
// Echo back to sender
$server->push($frame->fd, "Server received: {$frame->data}");
// Broadcast to all connections
foreach ($server->connections as $fd) {
if ($server->isEstablished($fd)) {
$server->push($fd, "Broadcast: {$frame->data}");
}
}
});
// WebSocket connection closed
$server->on('close', function (Server $server, int $fd) {
echo "Connection closed: {$fd}\n";
});
}
}
namespace App\WebSocket;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Psr\Log\LoggerInterface;
use App\Service\ChatService;
class ChatWebSocketHandler
{
public function __construct(
private readonly LoggerInterface $logger,
private readonly ChatService $chatService
) {
}
public function initServerEvents(Server $server): void
{
$server->on('open', function (Server $server, $request) {
$this->logger->info('WebSocket connection opened', ['fd' => $request->fd]);
// Authenticate user from request headers/cookies
$token = $request->header['authorization'] ?? null;
$user = $this->chatService->authenticateToken($token);
if ($user) {
$server->push($request->fd, json_encode([
'type' => 'auth_success',
'user' => $user
]));
} else {
$server->disconnect($request->fd);
}
});
$server->on('message', function (Server $server, Frame $frame) {
$this->logger->info('WebSocket message received', [
'fd' => $frame->fd,
'data' => $frame->data
]);
$data = json_decode($frame->data, true);
match ($data['type'] ?? null) {
'chat_message' => $this->handleChatMessage($server, $frame->fd, $data),
'typing' => $this->handleTyping($server, $frame->fd, $data),
'ping' => $server->push($frame->fd, json_encode(['type' => 'pong'])),
default => $this->logger->warning('Unknown message type', ['data' => $data])
};
});
$server->on('close', function (Server $server, int $fd) {
$this->logger->info('WebSocket connection closed', ['fd' => $fd]);
$this->chatService->handleDisconnect($fd);
});
}
private function handleChatMessage(Server $server, int $fd, array $data): void
{
$message = $this->chatService->saveMessage($fd, $data['message']);
// Broadcast to room members
foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
if ($server->isEstablished($memberId)) {
$server->push($memberId, json_encode([
'type' => 'new_message',
'message' => $message
]));
}
}
}
private function handleTyping(Server $server, int $fd, array $data): void
{
// Notify room members about typing status
foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
if ($memberId !== $fd && $server->isEstablished($memberId)) {
$server->push($memberId, json_encode([
'type' => 'user_typing',
'user_id' => $data['user_id']
]));
}
}
}
}
// Send message to specific connection
$server->push(int $fd, string $data, int $opcode = WEBSOCKET_OPCODE_TEXT): bool
// Check if connection is valid WebSocket connection
$server->isEstablished(int $fd): bool
// Disconnect connection
$server->disconnect(int $fd, int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL, string $reason = ''): bool
// Check if connection exists
$server->exist(int $fd): bool
// Get all connection IDs
$server->connections: Iterator
// Get connection info
$server->getClientInfo(int $fd): array|false
namespace App\Cron;
use Cesurapp\SwooleBundle\Cron\AbstractCronJob;
class MetricsCollectorCron extends AbstractCronJob
{
// Timer interval in SECONDS (numeric value instead of cron expression)
public string $TIME = '30'; // Runs every 30 seconds
public bool $ENABLE = true;
public function __construct(
private readonly MetricsService $metrics
) {}
public function __invoke(): void
{
$this->metrics->collect();
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.