PHP code example of ruudk / absurd-php-sdk

1. Go to this page and download the library: Download ruudk/absurd-php-sdk library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

ruudk / absurd-php-sdk example snippets




use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Connection\PdoConnection;
use Ruudk\Absurd\Task\Context as TaskContext;

// Create PDO connection
$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Create Absurd instance
$absurd = new Absurd(new PdoConnection($pdo));

// Register a task handler
$absurd->registerTask('order-fulfillment', function (array $params, TaskContext $ctx): array {
    // Each step is checkpointed - if the process crashes, we resume from the last completed step
    $payment = $ctx->step('process-payment', fn() => processPayment($params['amount']));

    $inventory = $ctx->step('reserve-inventory', fn() => reserveItems($params['items']));

    // Wait for an event - the task suspends until the event arrives
    $shipment = $ctx->awaitEvent("shipment.packed:{$params['orderId']}");

    $ctx->step('send-notification', fn() => sendEmail($params['email'], $shipment));

    return [
        'orderId' => $payment['id'],
        'trackingNumber' => $shipment['trackingNumber'],
    ];
});

// Start a worker that pulls tasks from Postgres
$worker = $absurd->startWorker();
$worker->start();

use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Connection\PdoConnection;
use Symfony\Component\EventDispatcher\EventDispatcher;

$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$absurd = new Absurd(
    connection: new PdoConnection($pdo),
    defaultQueueName: 'default',      // Default queue for tasks
    defaultMaxAttempts: 5,            // Default retry attempts
    eventDispatcher: new EventDispatcher(), // Optional: for hooks and error handling
);

use Ruudk\Absurd\Serialization\SymfonySerializer;

$absurd = new Absurd(
    connection: new PdoConnection($pdo),
    serializer: new SymfonySerializer(),
);

use Ruudk\Absurd\Connection\Connection;
use Ruudk\Absurd\Exception\QueryException;

final readonly class DbalConnection implements Connection
{
    public function __construct(private \Doctrine\DBAL\Connection $dbal) {}

    public function fetchAll(string $sql, array $params = []): array
    {
        return $this->dbal->fetchAllAssociative($sql, $params);
    }

    // ... implement fetch(), execute(), scalar()
}

$absurd = new Absurd(connection: new DbalConnection($dbal));

// Create a queue (ue('my-queue');

// List all queues
$queues = $absurd->listQueues();
// Returns: ['default', 'my-queue', ...]

// Drop a queue and all its data
$absurd->dropQueue('my-queue');

use Ruudk\Absurd\Task\RegisterOptions;
use Ruudk\Absurd\Task\CancellationPolicy;

// Simple registration
$absurd->registerTask('my-task', function (array $params, TaskContext $ctx): array {
    // Task logic here
    return ['result' => 'done'];
});

// Registration with options
$absurd->registerTask(
    'order-processor',
    function (array $params, TaskContext $ctx): array {
        // Task logic
        return [];
    },
    new RegisterOptions(
        queue: 'orders',                // Override default queue
        defaultMaxAttempts: 3,          // Override default max attempts
        defaultCancellation: new CancellationPolicy(
            maxDuration: 3600,          // Cancel after 1 hour total
            maxDelay: 300,              // Cancel if delayed more than 5 minutes
        ),
    ),
);

use Ruudk\Absurd\Task\SpawnOptions;
use Ruudk\Absurd\Task\RetryStrategy;
use Ruudk\Absurd\Task\CancellationPolicy;

// Simple spawn
$result = $absurd->spawn('order-fulfillment', [
    'orderId' => '42',
    'amount' => 9999,
    'items' => ['widget-1', 'gadget-2'],
    'email' => '[email protected]',
]);

echo "Task ID: {$result->taskId}\n";
echo "Run ID: {$result->runId}\n";
echo "Attempt: {$result->attempt}\n";
echo "Created: " . ($result->created ? 'yes' : 'no (from cache)') . "\n";

// Spawn with options
$result = $absurd->spawn(
    'order-fulfillment',
    $params,
    new SpawnOptions(
        maxAttempts: 5,
        retryStrategy: RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300),
        cancellation: new CancellationPolicy(maxDuration: 3600),
        headers: ['priority' => 'high', 'trace_id' => 'abc123'],
        idempotencyKey: 'order-42',     // Prevents duplicate task creation
    ),
    queue: 'orders',                    // Override queue for unregistered tasks
);

// Check if task was newly created or returned from idempotency cache
if ($result->created) {
    echo "New task created\n";
} else {
    echo "Existing task returned (idempotency key matched)\n";
}

use Ruudk\Absurd\Task\RetryStrategy;

// Exponential backoff: 10s, 20s, 40s, 80s... up to 300s
RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300);

// Linear backoff: 10s, 20s, 30s, 40s... up to 300s
RetryStrategy::linear(baseSeconds: 10, maxSeconds: 300);

// Fixed delay: always 30s between retries
RetryStrategy::fixed(seconds: 30);

// No delay: immediate retry
RetryStrategy::none();

$absurd->registerTask('workflow', function (array $params, TaskContext $ctx): array {
    // Access task metadata
    $taskId = $ctx->taskId;
    $runId = $ctx->runId;
    $attempt = $ctx->attempt;
    $headers = $ctx->headers;    // Custom headers from spawn options

    // Checkpoint a step - cached on retry
    $result = $ctx->step('step-name', fn() => expensiveOperation());

    // Split step - when wrapping work in a closure isn't practical
    $handle = $ctx->beginStep('split-step');
    if (!$handle->done) {
        $computed = expensiveOperation();
    }
    $result = $ctx->completeStep($handle, $computed ?? null);
    // completeStep returns the cached state on replay, or the provided value on first run

    // Wait for an external event
    $eventPayload = $ctx->awaitEvent('payment-confirmed');

    // Wait for event with timeout (throws TimeoutError if not received)
    use Ruudk\Absurd\Execution\AwaitEventOptions;
    $payload = $ctx->awaitEvent('webhook-received', new AwaitEventOptions(
        stepName: 'wait-webhook',   // Custom checkpoint name
        timeout: 300,               // 5 minute timeout
    ));

    // Sleep for a duration
    $ctx->sleepFor('delay', 60);  // Sleep for 60 seconds

    // Sleep until a specific time
    $ctx->sleepUntil('scheduled', new DateTimeImmutable('tomorrow 9am'));

    // Emit an event (can wake other waiting tasks)
    $ctx->emitEvent('order-processed', ['orderId' => '123']);

    // Extend the lease for long-running operations
    $ctx->heartbeat(120);     // Extend by 120 seconds
    $ctx->heartbeat();        // Extend by original claim timeout

    return ['done' => true];
});

// Emit an event that a suspended task might be waiting for
$absurd->emitEvent('shipment.packed:42', [
    'trackingNumber' => 'TRACK123',
]);

// Emit to a specific queue
$absurd->emitEvent('payment-confirmed', $payload, 'orders');

// Cancel a task by ID
$absurd->cancelTask($taskId);

// Cancel in a specific queue
$absurd->cancelTask($taskId, 'orders');

use Ruudk\Absurd\Task\RetryOptions;

// Retry with default options
$result = $absurd->retryTask($taskId);

// Retry with a higher attempt limit or as a brand new task
$result = $absurd->retryTask(
    $taskId,
    new RetryOptions(
        maxAttempts: 10,       // Override max attempts for this retry
        spawnNewTask: true,    // Spawn as a new task instead of resuming the existing one
    ),
    queue: 'orders',           // Optional: override queue
);

use Ruudk\Absurd\Task\TaskInfo;

// Get task state and result
$taskInfo = $absurd->getTask($taskId);

if ($taskInfo === null) {
    echo "Task not found\n";
}

// Check task state
echo "State: {$taskInfo->state}\n";  // pending, running, sleeping, completed, failed, cancelled
echo "Attempts: {$taskInfo->attempts}\n";

// Check terminal states
if ($taskInfo->isCompleted()) {
    $result = $taskInfo->completedPayload;
    echo "Result: " . json_encode($result) . "\n";
}

if ($taskInfo->isFailed()) {
    echo "Task failed after {$taskInfo->attempts} attempts\n";
}

if ($taskInfo->isCancelled()) {
    echo "Task was cancelled\n";
}

// Check if task reached any terminal state
if ($taskInfo->isTerminal()) {
    echo "Task is done (completed, failed, or cancelled)\n";
}

// Get from a specific queue
$taskInfo = $absurd->getTask($taskId, 'orders');

use Ruudk\Absurd\Worker\WorkerOptions;
use Psr\Log\NullLogger;

$worker = $absurd->startWorker(new WorkerOptions(
    workerId: 'my-worker-1',        // Unique identifier (default: hostname:pid)
    claimTimeout: 120,              // Seconds before task lease expires (default: 120)
    batchSize: 5,                   // Number of tasks to claim at once (default: 1)
    pollInterval: 0.25,             // Seconds between polls (default: 0.25)
    fatalOnLeaseTimeout: true,      // Exit process if lease times out (default: true)
    logger: new NullLogger(),       // PSR-3 logger for worker output
));

// Handle graceful shutdown
pcntl_async_signals(true);
pcntl_signal(SIGTERM, fn() => $worker->stop());
pcntl_signal(SIGINT, fn() => $worker->stop());

$worker->start();

use Ruudk\Absurd\Event\BeforeSpawnEvent;
use Ruudk\Absurd\Event\TaskErrorEvent;
use Ruudk\Absurd\Event\TaskExecutionEvent;
use Ruudk\Absurd\Event\WorkerRunningEvent;
use Symfony\Component\EventDispatcher\EventDispatcher;

$dispatcher = new EventDispatcher();

// Handle task errors
$dispatcher->addListener(TaskErrorEvent::class, function (TaskErrorEvent $event) {
    $exception = $event->exception;
    $task = $event->task;  // May be null for non-task errors

    error_log(sprintf(
        'Task error: %s (task: %s)',
        $exception->getMessage(),
        $task?->taskId ?? 'unknown',
    ));
});

// Stop the worker after 100 tasks (like Symfony Messenger's MaxMessagesListener)
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
    static $handled = 0;
    $handled += $event->tasksHandled;
    if ($handled >= 100) {
        $event->worker->stop();
    }
});

// React to idle cycles (no tasks found)
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
    if ($event->isIdle()) {
        // e.g. flush metrics, release resources between idle cycles
    }
});

// Modify spawn options before task creation (e.g., inject trace IDs)
$dispatcher->addListener(BeforeSpawnEvent::class, function (BeforeSpawnEvent $event) {
    $event->options = $event->options->with(
        headers: array_merge($event->options->headers ?? [], [
            'trace_id' => getCurrentTraceId(),
        ]),
    );
});

// Wrap task execution (e.g., restore trace context)
$dispatcher->addListener(TaskExecutionEvent::class, function (TaskExecutionEvent $event) {
    $event->wrapExecution(function (Closure $execute) use ($event) {
        $traceId = $event->context->headers['trace_id'] ?? null;
        $scope = TraceContext::restore($traceId);
        try {
            return $execute();
        } finally {
            $scope->detach();
        }
    });
});

$absurd = new Absurd(new PdoConnection($pdo), eventDispatcher: $dispatcher);

readonly class OrderPayload
{
    public function __construct(
        public string $orderId,
        public int $amount,
        public array $items,
    ) {}
}

$absurd->registerTask('process-order', function (OrderPayload $order, TaskContext $ctx): array {
    // $order is automatically deserialized to OrderPayload
    echo "Processing order: {$order->orderId}\n";

    return ['processed' => true];
});

// Spawn with typed payload
$absurd->spawn('process-order', new OrderPayload(
    orderId: 'ord-123',
    amount: 9999,
    items: ['widget', 'gadget'],
));

// First call creates the task
$result1 = $absurd->spawn('daily-report', $params, new SpawnOptions(
    idempotencyKey: 'daily-report-2024-01-15',
));
echo $result1->created;  // true

// Second call with same key returns existing task
$result2 = $absurd->spawn('daily-report', $differentParams, new SpawnOptions(
    idempotencyKey: 'daily-report-2024-01-15',
));
echo $result2->created;  // false
echo $result2->taskId === $result1->taskId;  // true

$absurd->registerTask('payment-task', function (array $params, TaskContext $ctx): array {
    $payment = $ctx->step('charge-card', function () use ($params, $ctx) {
        // Use taskId to create idempotency key for Stripe
        $idempotencyKey = "{$ctx->taskId}:payment";
        return $stripe->charges->create([
            'amount' => $params['amount'],
            'idempotency_key' => $idempotencyKey,
        ]);
    });

    return $payment;
});
bash
composer 
bash
# Set your OpenAI API key
export OPENAI_KEY=your-api-key

# Terminal 1: Start the worker
php examples/Agent/console consume

# Terminal 2: Ask questions
php examples/Agent/console ask