PHP code example of sbooker / persistent-pointer

1. Go to this page and download the library: Download sbooker/persistent-pointer library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

sbooker / persistent-pointer example snippets


// src/Infrastructure/Persistence/PointerStorage.php

use Sbooker\PersistentPointer\Pointer;
use Sbooker\PersistentPointer\PointerStorage;
use Sbooker\TransactionManager\TransactionManager; 

final class DoctrinePointerStorage implements PointerStorage
{
    private TransactionManager $transactionManager;

    public function __construct(TransactionManager $transactionManager)
    {
        $this->transactionManager = $transactionManager;
    }

    public function add(Pointer $pointer): void
    {
        // Делегируем всю работу TransactionManager
        $this->transactionManager->persist($pointer);
    }

    public function getAndLock(string $name): ?Pointer
    {
        // Делегируем всю работу TransactionManager
        return $this->transactionManager->getLocked(Pointer::class, $name);
    }
}

// bootstrap.php или ваш DI-контейнер

/** @var TransactionManager $transactionManager */
$pointerStorage = new PointerStorage($transactionManager);
$pointerRepository = new Sbooker\PersistentPointer\Repository($pointerStorage);

// src/Worker/EventProcessor.php
final class EventProcessor
{
    private const POINTER_NAME = 'event_processor';

    private Repository $pointerRepository;
    private TransactionManager $transactionManager;
    private EventRepository $eventRepository; // Репозиторий для ваших событий

    // ... constructor ...

    public function processBatch(): void
    {
        $this->transactionManager->transactional(function (): void {
            // 1. Получаем указатель с блокировкой.
            // Этот вызов будет использовать getLocked() из TransactionManager,
            // обеспечивая блокировку внутри общей транзакции.
            $pointer = $this->pointerRepository->getLocked(self::POINTER_NAME);
            $lastPosition = $pointer->getValue();

            // 2. Получаем новую порцию данных
            $events = $this->eventRepository->findSince($lastPosition);

            if (empty($events)) {
                return;
            }

            // 3. Обрабатываем данные...
            foreach ($events as $event) {
                // ... do some work ...
                $lastPosition = $event->getPosition();
            }

            // 4. Передвигаем указатель на новую позицию
            $pointer->increaseTo($lastPosition);

            // TransactionManager атомарно сохранит и новое значение указателя,
            // и любые другие изменения, сделанные в этой транзакции.
        });
    }
}