1. Go to this page and download the library: Download ruudk/absurd-php-sdk library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
ruudk / absurd-php-sdk example snippets
use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Connection\PdoConnection;
use Ruudk\Absurd\Task\Context as TaskContext;
// Create PDO connection
$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// Create Absurd instance
$absurd = new Absurd(new PdoConnection($pdo));
// Register a task handler
$absurd->registerTask('order-fulfillment', function (array $params, TaskContext $ctx): array {
// Each step is checkpointed - if the process crashes, we resume from the last completed step
$payment = $ctx->step('process-payment', fn() => processPayment($params['amount']));
$inventory = $ctx->step('reserve-inventory', fn() => reserveItems($params['items']));
// Wait for an event - the task suspends until the event arrives
$shipment = $ctx->awaitEvent("shipment.packed:{$params['orderId']}");
$ctx->step('send-notification', fn() => sendEmail($params['email'], $shipment));
return [
'orderId' => $payment['id'],
'trackingNumber' => $shipment['trackingNumber'],
];
});
// Start a worker that pulls tasks from Postgres
$worker = $absurd->startWorker();
$worker->start();
use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Connection\PdoConnection;
use Symfony\Component\EventDispatcher\EventDispatcher;
$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$absurd = new Absurd(
connection: new PdoConnection($pdo),
defaultQueueName: 'default', // Default queue for tasks
defaultMaxAttempts: 5, // Default retry attempts
eventDispatcher: new EventDispatcher(), // Optional: for hooks and error handling
);
use Ruudk\Absurd\Serialization\SymfonySerializer;
$absurd = new Absurd(
connection: new PdoConnection($pdo),
serializer: new SymfonySerializer(),
);
use Ruudk\Absurd\Connection\Connection;
use Ruudk\Absurd\Exception\QueryException;
final readonly class DbalConnection implements Connection
{
public function __construct(private \Doctrine\DBAL\Connection $dbal) {}
public function fetchAll(string $sql, array $params = []): array
{
return $this->dbal->fetchAllAssociative($sql, $params);
}
// ... implement fetch(), execute(), scalar()
}
$absurd = new Absurd(connection: new DbalConnection($dbal));
// Create a queue (ue('my-queue');
// List all queues
$queues = $absurd->listQueues();
// Returns: ['default', 'my-queue', ...]
// Drop a queue and all its data
$absurd->dropQueue('my-queue');
use Ruudk\Absurd\Task\RegisterOptions;
use Ruudk\Absurd\Task\CancellationPolicy;
// Simple registration
$absurd->registerTask('my-task', function (array $params, TaskContext $ctx): array {
// Task logic here
return ['result' => 'done'];
});
// Registration with options
$absurd->registerTask(
'order-processor',
function (array $params, TaskContext $ctx): array {
// Task logic
return [];
},
new RegisterOptions(
queue: 'orders', // Override default queue
defaultMaxAttempts: 3, // Override default max attempts
defaultCancellation: new CancellationPolicy(
maxDuration: 3600, // Cancel after 1 hour total
maxDelay: 300, // Cancel if delayed more than 5 minutes
),
),
);
use Ruudk\Absurd\Task\SpawnOptions;
use Ruudk\Absurd\Task\RetryStrategy;
use Ruudk\Absurd\Task\CancellationPolicy;
// Simple spawn
$result = $absurd->spawn('order-fulfillment', [
'orderId' => '42',
'amount' => 9999,
'items' => ['widget-1', 'gadget-2'],
'email' => '[email protected]',
]);
echo "Task ID: {$result->taskId}\n";
echo "Run ID: {$result->runId}\n";
echo "Attempt: {$result->attempt}\n";
echo "Created: " . ($result->created ? 'yes' : 'no (from cache)') . "\n";
// Spawn with options
$result = $absurd->spawn(
'order-fulfillment',
$params,
new SpawnOptions(
maxAttempts: 5,
retryStrategy: RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300),
cancellation: new CancellationPolicy(maxDuration: 3600),
headers: ['priority' => 'high', 'trace_id' => 'abc123'],
idempotencyKey: 'order-42', // Prevents duplicate task creation
),
queue: 'orders', // Override queue for unregistered tasks
);
// Check if task was newly created or returned from idempotency cache
if ($result->created) {
echo "New task created\n";
} else {
echo "Existing task returned (idempotency key matched)\n";
}
use Ruudk\Absurd\Task\RetryStrategy;
// Exponential backoff: 10s, 20s, 40s, 80s... up to 300s
RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300);
// Linear backoff: 10s, 20s, 30s, 40s... up to 300s
RetryStrategy::linear(baseSeconds: 10, maxSeconds: 300);
// Fixed delay: always 30s between retries
RetryStrategy::fixed(seconds: 30);
// No delay: immediate retry
RetryStrategy::none();
$absurd->registerTask('workflow', function (array $params, TaskContext $ctx): array {
// Access task metadata
$taskId = $ctx->taskId;
$runId = $ctx->runId;
$attempt = $ctx->attempt;
$headers = $ctx->headers; // Custom headers from spawn options
// Checkpoint a step - cached on retry
$result = $ctx->step('step-name', fn() => expensiveOperation());
// Split step - when wrapping work in a closure isn't practical
$handle = $ctx->beginStep('split-step');
if (!$handle->done) {
$computed = expensiveOperation();
}
$result = $ctx->completeStep($handle, $computed ?? null);
// completeStep returns the cached state on replay, or the provided value on first run
// Wait for an external event
$eventPayload = $ctx->awaitEvent('payment-confirmed');
// Wait for event with timeout (throws TimeoutError if not received)
use Ruudk\Absurd\Execution\AwaitEventOptions;
$payload = $ctx->awaitEvent('webhook-received', new AwaitEventOptions(
stepName: 'wait-webhook', // Custom checkpoint name
timeout: 300, // 5 minute timeout
));
// Sleep for a duration
$ctx->sleepFor('delay', 60); // Sleep for 60 seconds
// Sleep until a specific time
$ctx->sleepUntil('scheduled', new DateTimeImmutable('tomorrow 9am'));
// Emit an event (can wake other waiting tasks)
$ctx->emitEvent('order-processed', ['orderId' => '123']);
// Extend the lease for long-running operations
$ctx->heartbeat(120); // Extend by 120 seconds
$ctx->heartbeat(); // Extend by original claim timeout
return ['done' => true];
});
// Emit an event that a suspended task might be waiting for
$absurd->emitEvent('shipment.packed:42', [
'trackingNumber' => 'TRACK123',
]);
// Emit to a specific queue
$absurd->emitEvent('payment-confirmed', $payload, 'orders');
// Cancel a task by ID
$absurd->cancelTask($taskId);
// Cancel in a specific queue
$absurd->cancelTask($taskId, 'orders');
use Ruudk\Absurd\Task\RetryOptions;
// Retry with default options
$result = $absurd->retryTask($taskId);
// Retry with a higher attempt limit or as a brand new task
$result = $absurd->retryTask(
$taskId,
new RetryOptions(
maxAttempts: 10, // Override max attempts for this retry
spawnNewTask: true, // Spawn as a new task instead of resuming the existing one
),
queue: 'orders', // Optional: override queue
);
use Ruudk\Absurd\Task\TaskInfo;
// Get task state and result
$taskInfo = $absurd->getTask($taskId);
if ($taskInfo === null) {
echo "Task not found\n";
}
// Check task state
echo "State: {$taskInfo->state}\n"; // pending, running, sleeping, completed, failed, cancelled
echo "Attempts: {$taskInfo->attempts}\n";
// Check terminal states
if ($taskInfo->isCompleted()) {
$result = $taskInfo->completedPayload;
echo "Result: " . json_encode($result) . "\n";
}
if ($taskInfo->isFailed()) {
echo "Task failed after {$taskInfo->attempts} attempts\n";
}
if ($taskInfo->isCancelled()) {
echo "Task was cancelled\n";
}
// Check if task reached any terminal state
if ($taskInfo->isTerminal()) {
echo "Task is done (completed, failed, or cancelled)\n";
}
// Get from a specific queue
$taskInfo = $absurd->getTask($taskId, 'orders');
use Ruudk\Absurd\Worker\WorkerOptions;
use Psr\Log\NullLogger;
$worker = $absurd->startWorker(new WorkerOptions(
workerId: 'my-worker-1', // Unique identifier (default: hostname:pid)
claimTimeout: 120, // Seconds before task lease expires (default: 120)
batchSize: 5, // Number of tasks to claim at once (default: 1)
pollInterval: 0.25, // Seconds between polls (default: 0.25)
fatalOnLeaseTimeout: true, // Exit process if lease times out (default: true)
logger: new NullLogger(), // PSR-3 logger for worker output
));
// Handle graceful shutdown
pcntl_async_signals(true);
pcntl_signal(SIGTERM, fn() => $worker->stop());
pcntl_signal(SIGINT, fn() => $worker->stop());
$worker->start();
use Ruudk\Absurd\Event\BeforeSpawnEvent;
use Ruudk\Absurd\Event\TaskErrorEvent;
use Ruudk\Absurd\Event\TaskExecutionEvent;
use Ruudk\Absurd\Event\WorkerRunningEvent;
use Symfony\Component\EventDispatcher\EventDispatcher;
$dispatcher = new EventDispatcher();
// Handle task errors
$dispatcher->addListener(TaskErrorEvent::class, function (TaskErrorEvent $event) {
$exception = $event->exception;
$task = $event->task; // May be null for non-task errors
error_log(sprintf(
'Task error: %s (task: %s)',
$exception->getMessage(),
$task?->taskId ?? 'unknown',
));
});
// Stop the worker after 100 tasks (like Symfony Messenger's MaxMessagesListener)
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
static $handled = 0;
$handled += $event->tasksHandled;
if ($handled >= 100) {
$event->worker->stop();
}
});
// React to idle cycles (no tasks found)
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
if ($event->isIdle()) {
// e.g. flush metrics, release resources between idle cycles
}
});
// Modify spawn options before task creation (e.g., inject trace IDs)
$dispatcher->addListener(BeforeSpawnEvent::class, function (BeforeSpawnEvent $event) {
$event->options = $event->options->with(
headers: array_merge($event->options->headers ?? [], [
'trace_id' => getCurrentTraceId(),
]),
);
});
// Wrap task execution (e.g., restore trace context)
$dispatcher->addListener(TaskExecutionEvent::class, function (TaskExecutionEvent $event) {
$event->wrapExecution(function (Closure $execute) use ($event) {
$traceId = $event->context->headers['trace_id'] ?? null;
$scope = TraceContext::restore($traceId);
try {
return $execute();
} finally {
$scope->detach();
}
});
});
$absurd = new Absurd(new PdoConnection($pdo), eventDispatcher: $dispatcher);
readonly class OrderPayload
{
public function __construct(
public string $orderId,
public int $amount,
public array $items,
) {}
}
$absurd->registerTask('process-order', function (OrderPayload $order, TaskContext $ctx): array {
// $order is automatically deserialized to OrderPayload
echo "Processing order: {$order->orderId}\n";
return ['processed' => true];
});
// Spawn with typed payload
$absurd->spawn('process-order', new OrderPayload(
orderId: 'ord-123',
amount: 9999,
items: ['widget', 'gadget'],
));
// First call creates the task
$result1 = $absurd->spawn('daily-report', $params, new SpawnOptions(
idempotencyKey: 'daily-report-2024-01-15',
));
echo $result1->created; // true
// Second call with same key returns existing task
$result2 = $absurd->spawn('daily-report', $differentParams, new SpawnOptions(
idempotencyKey: 'daily-report-2024-01-15',
));
echo $result2->created; // false
echo $result2->taskId === $result1->taskId; // true
$absurd->registerTask('payment-task', function (array $params, TaskContext $ctx): array {
$payment = $ctx->step('charge-card', function () use ($params, $ctx) {
// Use taskId to create idempotency key for Stripe
$idempotencyKey = "{$ctx->taskId}:payment";
return $stripe->charges->create([
'amount' => $params['amount'],
'idempotency_key' => $idempotencyKey,
]);
});
return $payment;
});
bash
composer
bash
# Set your OpenAI API key
export OPENAI_KEY=your-api-key
# Terminal 1: Start the worker
php examples/Agent/console consume
# Terminal 2: Ask questions
php examples/Agent/console ask
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.