PHP code example of josbeir / cakephp-mercure

1. Go to this page and download the library: Download josbeir/cakephp-mercure library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

josbeir / cakephp-mercure example snippets


// src/Application.php
public function bootstrap(): void
{
    parent::bootstrap();

    $this->addPlugin('Mercure');
}

// config/plugins.php
return [
    'Mercure' => [],
];

'Mercure' => [
    'url' => env('MERCURE_URL', 'http://mercure/.well-known/mercure'),
    'public_url' => env('MERCURE_PUBLIC_URL', '/mercure-hub'),
    'jwt' => [
        'secret' => env('MERCURE_JWT_SECRET', '!ChangeThisMercureHubJWTSecretKey!'),
        'algorithm' => 'HS256',
        'publish' => ['*'],
    ],
],

use Mercure\Publisher;
use Mercure\Update\Update;

// In a controller or service
$update = new Update(
    topics: 'https://example.com/books/1',
    data: json_encode(['status' => 'OutOfStock'])
);

Publisher::publish($update);

$update = new Update(
    topics: [
        'https://example.com/books/1',
        'https://example.com/notifications',
    ],
    data: json_encode(['message' => 'Book status changed'])
);

Publisher::publish($update);

> // In your controller
> public function initialize(): void
> {
>     parent::initialize();
>     $this->loadComponent('Mercure.Mercure');
> }
>
> public function update($id)
> {
>     $book = $this->Books->get($id);
>     $book = $this->Books->patchEntity($book, $this->request->getData());
>     $this->Books->save($book);
>
>     // Publish JSON directly - no need for Publisher facade
>     $this->Mercure->publishJson(
>         topics: "/books/{$id}",
>         data: ['status' => $book->status, 'title' => $book->title]
>     );
>
>     // Or publish a rendered element
>     $this->Mercure->publishView(
>         topics: "/books/{$id}",
>         element: 'Books/item',
>         data: ['book' => $book]
>     );
> }
> 

use Mercure\Publisher;
use Mercure\Update\JsonUpdate;

// Simple array - no need to call json_encode()
$update = JsonUpdate::create(
    topics: 'https://example.com/books/1',
    data: ['status' => 'OutOfStock', 'quantity' => 0]
);

Publisher::publish($update);

// Or use the fluent builder pattern
$update = (new JsonUpdate('https://example.com/books/1'))
    ->data(['status' => 'OutOfStock', 'quantity' => 0])
    ->build();

Publisher::publish($update);

// With custom JSON encoding options
$update = JsonUpdate::create(
    topics: 'https://example.com/books/1',
    data: ['title' => 'Book & Title', 'price' => 19.99],
    jsonOptions: JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE
);

// Or using the fluent builder
$update = (new JsonUpdate('https://example.com/books/1'))
    ->data(['title' => 'Book & Title', 'price' => 19.99])
    ->jsonOptions(JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE)
    ->build();

Publisher::publish($update);

$update = JsonUpdate::create(
    topics: 'https://example.com/users/123/notifications',
    data: ['message' => 'New notification', 'unread' => 5],
    private: true
);

// Or chain multiple options with the fluent builder
$update = (new JsonUpdate('https://example.com/books/1'))
    ->data(['title' => 'New Book', 'price' => 29.99])
    ->private()
    ->id(Text::uuid())
    ->type('book.created')
    ->retry(5000)
    ->build();

Publisher::publish($update);

use Mercure\Publisher;
use Mercure\Update\ViewUpdate;

// Render an element
$update = ViewUpdate::create(
    topics: 'https://example.com/books/1',
    element: 'Books/item',
    viewVars: ['book' => $book]
);

// Or use the fluent builder pattern
$update = (new ViewUpdate('https://example.com/books/1'))
    ->element('Books/item')
    ->viewVars(['book' => $book])
    ->build();

Publisher::publish($update);

// Render a template
$update = ViewUpdate::create(
    topics: 'https://example.com/notifications',
    template: 'Notifications/item',
    viewVars: ['notification' => $notification]
);

// Or with the fluent builder - add view options too
$update = (new ViewUpdate('https://example.com/notifications'))
    ->template('Notifications/item')
    ->viewVars(['notification' => $notification])
    ->viewOptions(['key' => 'value'])
    ->build();

Publisher::publish($update);

$update = ViewUpdate::create(
    topics: 'https://example.com/users/123/messages',
    element: 'Messages/item',
    viewVars: ['message' => $message],
    private: true
);

// Or chain all options with the fluent builder
$update = (new ViewUpdate('https://example.com/users/123/messages'))
    ->element('Messages/item')
    ->viewVars(['message' => $message])
    ->private()
    ->id('msg-456')
    ->type('message.new')
    ->build();

Publisher::publish($update);

// In src/View/AppView.php
public function initialize(): void
{
    parent::initialize();
    $this->loadHelper('Mercure.Mercure');
}

// In your template
<div id="book-status">Available</div>

<script>
// For public topics (no authorization needed)
const eventSource = new EventSource('<?= $this->Mercure->url(['https://example.com/books/1']) 

<script>
// Subscribe to multiple topics
const url = '<?= $this->Mercure->url([
    'https://example.com/books/1',
    'https://example.com/notifications'
]) 

<script type="application/json" id="mercure-url">
<?= json_encode(
    $this->Mercure->url(['https://example.com/books/1']),
    JSON_UNESCAPED_SLASHES | JSON_HEX_TAG
) 

$update = new Update(
    topics: 'https://example.com/users/123/messages',
    data: json_encode(['text' => 'Private message']),
    private: true
);

Publisher::publish($update);

class BooksController extends AppController
{
    public function initialize(): void
    {
        parent::initialize();
        $this->loadComponent('Mercure.Mercure');
    }

    public function view($id)
    {
        $book = $this->Books->get($id);
        $userId = $this->request->getAttribute('identity')->id;

        // Using builder pattern
        $this->Mercure
            ->addTopic('https://example.com/books/123') // You can also set this using MercureHelper or using the defaultTopics option.
            ->addSubscribe("https://example.com/books/{$id}")
            ->addSubscribe("https://example.com/notifications/{$id}")
            ->authorize() // This sets the actual JWT cookie.

        // Or direct authorization
        $this->Mercure->authorize(
            subscribe: ["https://example.com/books/{$id}"],
            additionalClaims: ['sub' => $userId] // Optional
        );

        $this->set('book', $book);
    }

    public function logout()
    {
        // Clear authorization on logout
        $this->Mercure->clearAuthorization(); // Removes the JWT cookie.

        return $this->redirect(['action' => 'login']);
    }
}

// In AppController
$this->loadComponent('Mercure.Mercure', [
    'autoDiscover' => true,  // Automatically add discovery headers
]);

// In your `AppView` using the helper
public function initialize(): void
{
    parent::initialize();

    // Load helper with default topics
    $this->loadHelper('Mercure', [
        'defaultTopics' => [
            'https://example.com/notifications',
            'https://example.com/alerts'
        ]
    ]);
}

// In your controller using the component
public function initialize(): void
{
    parent::initialize();
    $this->loadComponent('Mercure.Mercure', [
        'defaultTopics' => [
            'https://example.com/notifications',
            'https://example.com/alerts'
        ]
    ]);
}

// In your template
<script>
// This will subscribe to: /notifications, /alerts, AND /books/123
const url = '<?= $this->Mercure->url(['/books/123']) 

use Mercure\Authorization;

public function view($id)
{
    $book = $this->Books->get($id);

    // Allow this user to subscribe to updates for this book
    $response = Authorization::setCookie(
        $this->response,
        subscribe: ["https://example.com/books/{$id}"]
    );

    $this->set('book', $book);
    return $response;
}

// In your controller action
$this->Mercure->discover();

// Add discovery header with optional link attributes
$this->Mercure->discover(
    lastEventId: '123',
    contentType: 'application/json',
    keySet: 'https://example.com/.well-known/jwks.json'
);

// Option 1: Enable per-call
$this->Mercure
    ->authorize(['/books/123', '/notifications/*'])
    ->discover( true
]);

// Then in your action:
$this->Mercure
    ->authorize(['/books/123'])
    ->discover(); // Automatically 

// In src/Application.php
use Mercure\Http\Middleware\MercureDiscoveryMiddleware;

public function middleware(MiddlewareQueue $middlewareQueue): MiddlewareQueue
{
    $middlewareQueue
        // ... other middleware
        ->add(new MercureDiscoveryMiddleware());

    return $middlewareQueue;
}

'jwt' => [
    'secret' => env('MERCURE_JWT_SECRET'),
    'algorithm' => 'HS256',
    'publish' => ['*'],
    'subscribe' => ['*'],
]

'jwt' => [
    'value' => env('MERCURE_JWT_TOKEN'),
]

'jwt' => [
    'provider' => \App\Mercure\CustomTokenProvider::class,
]

namespace App\Mercure;

use Mercure\Jwt\TokenProviderInterface;

class CustomTokenProvider implements TokenProviderInterface
{
    public function getJwt(): string
    {
        // Generate and return JWT token
        return $this->generateToken();
    }
}

'jwt' => [
    'factory' => \App\Mercure\CustomTokenFactory::class,
    'secret' => env('MERCURE_JWT_SECRET'),
    'publish' => ['*'],
]

namespace App\Mercure;

use Mercure\Jwt\TokenFactoryInterface;

class CustomTokenFactory implements TokenFactoryInterface
{
    public function __construct(
        private string $secret,
        private string $algorithm
    ) {}

    public function create(array $subscribe = [], array $publish = [], array $additionalClaims = []): string
    {
        // Create and return JWT token
    }
}

'http_client' => [
    'timeout' => 30,
    'ssl_verify_peer' => false, // For local development only
]

'cookie' => [
    'name' => 'mercureAuthorization',

    // Lifetime in seconds (0 for session cookie)
    'lifetime' => 3600,  // 1 hour

    // Or use explicit expiry datetime
    // 'expires' => '+1 hour',

    // Omit both to use PHP's session.cookie_lifetime setting

    'domain' => '.example.com',
    'path' => '/',
    'secure' => true,      // HTTPS only (recommended)
    'httponly' => true,    // Prevents XSS token theft
    'samesite' => 'strict', // CSRF protection
]

use Mercure\Publisher;
use Mercure\Service\PublisherInterface;
use Mercure\TestSuite\MockPublisher;

// In your test
public function testPublishing(): void
{
    // Se the mock publisher
    Publisher::setInstance(new MockPublisher());

    // Test your code that publishes updates
    $this->MyService->doSomething();

    // Clean up
    Publisher::clear();
}

use Mercure\Authorization;
use Mercure\Service\AuthorizationInterface;

public function testAuthorization(): void
{
    $mockAuth = $this->createMock(AuthorizationInterface::class);
    Authorization::setInstance($mockAuth);

    // Your tests here

    Authorization::clear();
}

public function initialize(): void
{
    parent::initialize();
    $this->loadComponent('Mercure.Mercure', [
        'autoDiscover' => true,      // Optional: auto-add discovery headers
        'discoverWithTopics' => false, // Optional: 

// In controller
public function view($id)
{
    $book = $this->Books->get($id);

    // Add topics that will be available in the view
    $this->Mercure
        ->addTopic("/books/{$id}")
        ->addTopic("/user/{$userId}/updates")
        ->authorize(["/books/{$id}"]);

    $this->set('book', $book);
}

// In template - topics are automatically 

// Build up gradually with claims
$this->Mercure
    ->addSubscribe('/books/123', ['sub' => $userId])
    ->addSubscribe('/notifications/*', ['role' => 'admin'])
    ->authorize()
    ->discover();

// Add multiple at once
$this->Mercure->addSubscribes(
    ['/books/123', '/notifications/*'],
    ['sub' => $userId, 'role' => 'admin']
);

// Mix builder and direct parameters
$this->Mercure
    ->addSubscribe('/books/123')
    ->authorize(['/notifications/*'], ['sub' => $userId]);

// Chain with topic management
$this->Mercure
    ->addTopic('/books/123')                          // For EventSource
    ->addSubscribe('/books/123', ['sub' => $userId])  // For authorization
    ->authorize()
    ->discover();

// Publish JSON data
$this->Mercure->publishJson(
    topics: '/books/123',
    data: ['status' => 'updated', 'title' => $book->title]
);

// Publish rendered element
$this->Mercure->publishView(
    topics: '/books/123',
    element: 'Books/item',
    data: ['book' => $book]
);

// Publish rendered template with layout
$this->Mercure->publishView(
    topics: '/notifications',
    template: 'Notifications/item',
    layout: 'ajax',
    data: ['notification' => $notification]
);

// For advanced use cases, publish an Update object directly
$update = new Update('/books/123', json_encode(['data' => 'value']));
$this->Mercure->publish($update);

new Update(
    string|array $topics,
    string $data,
    bool $private = false,
    ?string $id = null,
    ?string $type = null,
    ?int $retry = null
)

use Mercure\Update\JsonUpdate;

// Basic usage
$update = (new JsonUpdate('/books/1'))
    ->data(['status' => 'OutOfStock', 'quantity' => 0])
    ->build();

// With all options
$update = (new JsonUpdate('/books/1'))
    ->data(['title' => 'Book', 'price' => 29.99])
    ->jsonOptions(JSON_UNESCAPED_UNICODE)
    ->private()
    ->id(Text::uuid())
    ->type('book.updated')
    ->retry(5000)
    ->build();

JsonUpdate::create(
    string|array $topics,
    mixed $data,
    bool $private = false,
    ?string $id = null,
    ?string $type = null,
    ?int $retry = null,
    int $jsonOptions = JSON_UNESCAPED_SLASHES | JSON_THROW_ON_ERROR
): Update

use Mercure\Update\ViewUpdate;

// Render element
$update = (new ViewUpdate('/books/1'))
    ->element('Books/item')
    ->viewVars(['book' => $book])
    ->build();

// Render template with all options
$update = (new ViewUpdate('/notifications'))
    ->template('Notifications/item')
    ->viewVars(['notification' => $notification])
    ->layout('ajax')
    ->viewOptions(['key' => 'value'])
    ->private()
    ->id('notif-123')
    ->type('notification.new')
    ->build();

ViewUpdate::create(
    string|array $topics,
    ?string $template = null,
    ?string $element = null,
    array $viewVars = [],
    ?string $layout = null,
    bool $private = false,
    ?string $id = null,
    ?string $type = null,
    ?int $retry = null,
    array $viewOptions = []
): Update

use Mercure\Update\ViewUpdate;

// Render an element
$update = ViewUpdate::create(
    topics: '/books/1',
    element: 'Books/item',
    data: ['book' => $book]
);

// Render a template with layout
$update = ViewUpdate::create(
    topics: '/dashboard',
    template: 'Dashboard/stats',
    layout: 'ajax',
    data: ['stats' => $stats]
);

// In src/Application.php
use Mercure\Http\Middleware\MercureDiscoveryMiddleware;

public function middleware(MiddlewareQueue $middlewareQueue): MiddlewareQueue
{
    $middlewareQueue->add(new MercureDiscoveryMiddleware());
    return $middlewareQueue;
}


declare(strict_types=1);

namespace App\Queue\Task;

use Cake\Log\Log;
use Mercure\Publisher;
use Mercure\Update\JsonUpdate;
use Throwable;

/**
 * Trait for publishing real-time updates from queue tasks via Mercure.
 */
trait MercureProgressTrait
{
    protected string $mercureTopicPrefix = '/queue/job';

    /**
     * Publish a progress update for a queue job.
     */
    protected function publishProgress(int $jobId, string $status, array $data = []): void
    {
        try {
            $topic = sprintf('%s/%d', $this->mercureTopicPrefix, $jobId);
            $payload = array_merge(['status' => $status], $data);

            Publisher::publish(JsonUpdate::create(
                topics: $topic,
                data: $payload,
            ));
        } catch (Throwable $e) {
            // Log but don't fail the job if Mercure is unavailable
            Log::write('warning', sprintf(
                'Failed to publish Mercure update for job `%d`: %s',
                $jobId,
                $e->getMessage(),
            ));
        }
    }

    /**
     * Publish a "started" status update.
     */
    protected function publishStarted(int $jobId, string $message = '', array $data = []): void
    {
        $payload = $data;
        if ($message) {
            $payload['message'] = $message;
        }
        $this->publishProgress($jobId, 'started', $payload);
    }

    /**
     * Publish a progress update with percentage.
     */
    protected function publishProgressPercent(int $jobId, int $current, int $total, array $data = []): void
    {
        $percent = $total > 0 ? round(($current / $total) * 100, 1) : 0;
        $payload = array_merge([
            'current' => $current,
            'total' => $total,
            'percent' => $percent,
        ], $data);

        $this->publishProgress($jobId, 'progress', $payload);
    }

    /**
     * Publish a "completed" status update.
     */
    protected function publishCompleted(int $jobId, string $message = '', array $data = []): void
    {
        $payload = $data;
        if ($message) {
            $payload['message'] = $message;
        }
        $this->publishProgress($jobId, 'completed', $payload);
    }

    /**
     * Publish a "failed" status update.
     */
    protected function publishFailed(int $jobId, string $error, array $data = []): void
    {
        $payload = array_merge(['error' => $error], $data);
        $this->publishProgress($jobId, 'failed', $payload);
    }
}


declare(strict_types=1);

namespace App\Queue\Task;

use Queue\Queue\Task;

class ImportProductsTask extends Task
{
    use MercureProgressTrait;

    public function run(array $data, int $jobId): void
    {
        $this->publishStarted($jobId, 'Starting product import...');

        $products = $this->fetchProducts($data['source']);
        $total = count($products);
        $saved = 0;

        foreach ($products as $index => $product) {
            $this->saveProduct($product);
            $saved++;

            // Publish progress every 10 items to avoid flooding
            if ($index % 10 === 0) {
                $this->publishProgressPercent($jobId, $index + 1, $total, [
                    'message' => sprintf('Processing %d of %d products...', $index + 1, $total),
                    'saved' => $saved,
                ]);
            }
        }

        $this->publishCompleted($jobId, sprintf('Imported %d products', $saved), [
            'total' => $total,
            'saved' => $saved,
        ]);
    }
}


// templates/Admin/Jobs/monitor.php
$this->loadHelper('Mercure.Mercure');


declare(strict_types=1);

namespace App\Controller\Admin;

use App\Controller\AppController;
use Queue\Model\Table\QueuedJobsTable;

class JobsController extends AppController
{
    public function monitor(int $id)
    {
        $job = $this->fetchTable('Queue.QueuedJobs')->get($id);

        $this->set([
            'jobId' => $job->id,
            'jobTask' => $job->job_task,
        ]);
    }

    public function trigger()
    {
        /** @var QueuedJobsTable $queuedJobsTable */
        $queuedJobsTable = $this->fetchTable('Queue.QueuedJobs');

        $job = $queuedJobsTable->createJob(
            'ImportProducts',
            ['source' => 'api'],
        );

        // Redirect to monitor page with job ID
        return $this->redirect(['action' => 'monitor', $job->id]);
    }
}
nginx
location /mercure-hub {
    rewrite ^/mercure-hub(.*)$ /.well-known/mercure$1 break;
    proxy_pass http://mercure:80;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_set_header Cache-Control 'no-cache';
    proxy_set_header X-Accel-Buffering 'no';
    proxy_buffering off;
    chunked_transfer_encoding off;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_read_timeout 24h;
    proxy_connect_timeout 1h;
}