1. Go to this page and download the library: Download sbooker/persistent-pointer library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
sbooker / persistent-pointer example snippets
// src/Infrastructure/Persistence/PointerStorage.php
use Sbooker\PersistentPointer\Pointer;
use Sbooker\PersistentPointer\PointerStorage;
use Sbooker\TransactionManager\TransactionManager;
final class DoctrinePointerStorage implements PointerStorage
{
private TransactionManager $transactionManager;
public function __construct(TransactionManager $transactionManager)
{
$this->transactionManager = $transactionManager;
}
public function add(Pointer $pointer): void
{
// Делегируем всю работу TransactionManager
$this->transactionManager->persist($pointer);
}
public function getAndLock(string $name): ?Pointer
{
// Делегируем всю работу TransactionManager
return $this->transactionManager->getLocked(Pointer::class, $name);
}
}
// bootstrap.php или ваш DI-контейнер
/** @var TransactionManager $transactionManager */
$pointerStorage = new PointerStorage($transactionManager);
$pointerRepository = new Sbooker\PersistentPointer\Repository($pointerStorage);
// src/Worker/EventProcessor.php
final class EventProcessor
{
private const POINTER_NAME = 'event_processor';
private Repository $pointerRepository;
private TransactionManager $transactionManager;
private EventRepository $eventRepository; // Репозиторий для ваших событий
// ... constructor ...
public function processBatch(): void
{
$this->transactionManager->transactional(function (): void {
// 1. Получаем указатель с блокировкой.
// Этот вызов будет использовать getLocked() из TransactionManager,
// обеспечивая блокировку внутри общей транзакции.
$pointer = $this->pointerRepository->getLocked(self::POINTER_NAME);
$lastPosition = $pointer->getValue();
// 2. Получаем новую порцию данных
$events = $this->eventRepository->findSince($lastPosition);
if (empty($events)) {
return;
}
// 3. Обрабатываем данные...
foreach ($events as $event) {
// ... do some work ...
$lastPosition = $event->getPosition();
}
// 4. Передвигаем указатель на новую позицию
$pointer->increaseTo($lastPosition);
// TransactionManager атомарно сохранит и новое значение указателя,
// и любые другие изменения, сделанные в этой транзакции.
});
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.