PHP code example of cainydev / laragraph

1. Go to this page and download the library: Download cainydev/laragraph library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

cainydev / laragraph example snippets


use Cainy\Laragraph\Builder\Workflow;

class MyPipeline extends Workflow
{
    public function definition(): void
    {
        $this->addNode('fetch',     FetchNode::class)
             ->addNode('transform', TransformNode::class)
             ->addNode('store',     StoreNode::class)
             ->transition(Workflow::START, 'fetch')
             ->transition('fetch',     'transform')
             ->transition('transform', 'store')
             ->transition('store',     Workflow::END);
    }
}

use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Engine\NodeExecutionContext;

class SummarizeNode implements Node
{
    public function handle(NodeExecutionContext $context, array $state): array
    {
        $text = implode("\n", $state['paragraphs'] ?? []);

        return ['summary' => substr($text, 0, 200)];
    }
}

$context->runId            // int    — ID of the WorkflowRun
$context->workflowKey      // string — class name of the workflow
$context->nodeName         // string — name of this node in the graph
$context->attempt          // int    — current queue attempt (1-based)
$context->maxAttempts      // int    — maximum attempts configured
$context->createdAt        // DateTimeImmutable
$context->isolatedPayload  // ?array — payload injected by a Send (see Dynamic Fan-out)
$context->parentRunId      // ?int   — set when this run was dispatched as a child workflow
$context->parentNodeName   // ?string — the sub-graph node name on the parent
$context->routing          // array — read-only engine routing snapshot (counters, interrupt marker, etc.)

// Helpers:
$context->isSendExecution()        // bool   — true when dispatched via a Send
$context->payload('key', $default) // mixed  — read a value from the isolated payload
$context->parentMetadata()         // ?array — lazy-loads the parent run's metadata (null at top-level)

$this->transition(Workflow::START, 'fetch')
     ->transition('fetch', 'transform')
     ->transition('transform', Workflow::END);

->transition('classify', 'approve', fn(array $state) => $state['score'] > 50)
->transition('classify', 'reject',  fn(array $state) => $state['score'] <= 50)

->branch('router', function(array $state): string {
    return $state['approved'] ? 'publish' : 'revise';
}, targets: ['publish', 'revise'])

$this->addNode('split',    SplitNode::class)
     ->addNode('branch-a', BranchANode::class)
     ->addNode('branch-b', BranchBNode::class)
     ->addNode('merge',    MergeNode::class)
     ->transition(Workflow::START, 'split')
     ->transition('split', 'branch-a')
     ->transition('split', 'branch-b')
     ->transition('branch-a', 'merge')
     ->transition('branch-b', 'merge')
     ->transition('merge', Workflow::END);

use Cainy\Laragraph\Routing\Send;

->branch('planner', function(array $state): array {
    return array_map(
        fn(string $query) => new Send('worker', ['query' => $query]),
        $state['queries']
    );
}, targets: ['worker'])

public function handle(NodeExecutionContext $context, array $state): array
{
    $query = $context->payload('query');
    // ...
}

->branch('split', fn($s) => array_map(fn($id) => new Send('worker', ['id' => $id]), $s['ids']), ['worker'])
->transition('worker', 'barrier')
->transition('barrier', 'aggregate')

class LeadPipeline extends Workflow
{
    public function definition(): void
    {
        $this->addNode('enrich',   EnrichNode::class)
             ->addNode('qualify',  QualifyNode::class)
             ->addNode('classify', ClassifyNode::class)
             ->transition(Workflow::START, 'enrich')
             ->transition('enrich',   'qualify')
             ->transition('qualify',  'classify')
             ->transition('classify', Workflow::END);
    }
}

class FanoutPipeline extends Workflow
{
    public function definition(): void
    {
        $this->addNode('per_lead', app(LeadPipeline::class))
             ->addNode('barrier',  new BarrierNode())
             ->branch(Workflow::START, fn($s) => collect($s['lead_ids'])->map(
                 fn($id) => Send::toWorkflow('per_lead', ['lead_id' => $id])
             )->all(), ['per_lead'])
             ->transition('per_lead', 'barrier')
             ->transition('barrier',  Workflow::END);
    }
}

use Cainy\Laragraph\Facades\Laragraph;

$run = Laragraph::run(MyPipeline::class, initialState: [
    'input' => 'Hello, world!',
]);

echo $run->id;     // WorkflowRun ID
echo $run->status; // RunStatus::Running

$run = Laragraph::run(MyPipeline::class,
    initialState: ['input' => 'Hello'],
    metadata: ['trace_id' => $traceId, 'user_id' => $userId],
);

$run->metadata; // ['trace_id' => ..., 'user_id' => ...]

// Pause a running workflow
Laragraph::pause($run->id);

// Resume a paused workflow, optionally merging additional state
Laragraph::resume($run->id, ['approved' => true]);

// Abort a workflow (sets status to Failed, clears all pointers)
Laragraph::abort($run->id);

class MyPipeline extends Workflow
{
    public function definition(): void { /* ... */ }

    public function onStarting(WorkflowRun $run): void
    {
        Log::info("Run {$run->id} starting");
    }

    public function onCompleted(WorkflowRun $run): void
    {
        Cache::forget("pipeline:{$run->metadata['trace_id']}");
    }

    public function onFailed(WorkflowRun $run, Throwable $exception): void
    {
        report($exception);
    }
}

// Globally
$this->app->bind(StateReducerInterface::class, MyReducer::class);

// Per workflow
$this->withReducer(MyReducer::class)

$this->addNode('review', ReviewNode::class)
     ->interruptBefore('review');

$this->addNode('drafter', DrafterNode::class)
     ->addNode('publish',  PublishNode::class)
     ->transition(Workflow::START, 'drafter')
     ->transition('drafter', 'publish')
     ->transition('publish', Workflow::END)
     ->interruptAfter('drafter');

Laragraph::resume($run->id, [
    'meta' => ['approved' => true],
]);

use Cainy\Laragraph\Exceptions\NodePausedException;

class ConfidenceCheckNode implements Node
{
    public function handle(NodeExecutionContext $context, array $state): array
    {
        if ($state['confidence'] < 0.7) {
            throw new NodePausedException($context->nodeName);
        }

        return ['status' => 'confident'];
    }
}

throw new NodePausedException(
    nodeName:      $context->nodeName,
    stateMutation: ['draft_attempt' => ($state['draft_attempt'] ?? 0) + 1],
    gateReason:    'Score too low — human review 

use Cainy\Laragraph\Contracts\HasName;

class ResearchAgentNode implements Node, HasName
{
    public function name(): string
    {
        return 'research-agent';
    }
}

use Cainy\Laragraph\Contracts\HasTags;

class LLMNode implements Node, HasTags
{
    private string $model = '';
    private int $tokens = 0;

    public function handle(NodeExecutionContext $context, array $state): array
    {
        // ... call LLM, populate $this->model and $this->tokens ...
        return ['response' => $result];
    }

    public function tags(): array
    {
        return [
            'model'    => $this->model,
            'tokens'   => $this->tokens,
            'cost_usd' => $this->tokens * 0.000003,
        ];
    }
}

// All executions for a run
$run->nodeExecutions;

// Total cost for a run
$run->nodeExecutions->sum(fn($e) => $e->tags['cost_usd'] ?? 0);

// Per-node cost breakdown
$run->nodeExecutions
    ->groupBy('node_name')
    ->map(fn($execs) => $execs->sum(fn($e) => $e->tags['cost_usd'] ?? 0));

// Failed executions only
$run->nodeExecutions->filter(fn($e) => $e->failed());

use Cainy\Laragraph\Contracts\HasRetryPolicy;
use Cainy\Laragraph\Engine\RetryPolicy;

class FlakyAPINode implements Node, HasRetryPolicy
{
    public function retryPolicy(): RetryPolicy
    {
        return new RetryPolicy(
            initialInterval: 1.0,
            backoffFactor:   2.0,
            maxInterval:     30.0,
            maxAttempts:     5,
            jitter:          true,
        );
    }
}

new RetryPolicy(
    maxAttempts: 3,
    retryOn: [RateLimitException::class, TimeoutException::class],
)

// Or with a Closure for full control:
new RetryPolicy(
    maxAttempts: 3,
    retryOn: fn(Throwable $e) => $e->getCode() === 429,
)

use Cainy\Laragraph\Contracts\HasQueue;

class HeavyLLMNode implements Node, HasQueue
{
    public function queue(): string
    {
        return 'llm';
    }

    public function connection(): ?string
    {
        return null; // use default connection
    }
}

use Cainy\Laragraph\Contracts\HasMiddleware;
use Illuminate\Queue\Middleware\RateLimited;

class AnthropicNode implements Node, HasMiddleware
{
    public function middleware(): array
    {
        return [new RateLimited('anthropic')];
    }
}

use Cainy\Laragraph\Contracts\HasLoop;

class PollingNode implements Node, HasLoop
{
    public function loopNode(string $nodeName): Node
    {
        return new CheckStatusNode();
    }

    public function loopCondition(): \Closure
    {
        return fn(array $state) => $state['status'] !== 'done';
    }
}

->interruptBefore(Workflow::toolNode('agent'))

use Cainy\Laragraph\Contracts\IsFanInBarrier;

class MyBarrierNode implements Node, IsFanInBarrier
{
    public function handle(NodeExecutionContext $context, array $state): array
    {
        // Only called once — after every predecessor has committed.
        return ['merged' => true];
    }
}

use Cainy\Laragraph\Nodes\GateNode;

$this->addNode('approve', new GateNode(reason: 'Manager approval 

use Cainy\Laragraph\Nodes\SendNode;

$this->addNode('fanout', new SendNode(
         sourceKey:  'queries',
         targetNode: 'worker',
         payloadKey: 'query',
     ))
     ->addNode('worker', WorkerNode::class)
     ->transition(Workflow::START, 'fanout')
     ->transition('fanout', 'worker');

use Cainy\Laragraph\Nodes\BarrierNode;

->addNode('barrier', new BarrierNode())
->transition('worker', 'barrier')
->transition('barrier', 'aggregator')

use Cainy\Laragraph\Nodes\HttpNode;

->addNode('fetch', new HttpNode(
    url:         'https://api.example.com/items/{state.item_id}',
    method:      'GET',
    headers:     ['Authorization' => 'Bearer token'],
    responseKey: 'api_response',
))

new HttpNode(url: '...', method: 'POST', bodyKey: 'payload', responseKey: 'result')

use Cainy\Laragraph\Nodes\DelayNode;

->addNode('wait', new DelayNode(seconds: 300))

use Cainy\Laragraph\Nodes\CacheNode;

->addNode('load',  new CacheNode(operation: 'get',    cacheKey: 'report:{state.user_id}', stateKey: 'cached_report'))
->addNode('store', new CacheNode(operation: 'put',    cacheKey: 'report:{state.user_id}', stateKey: 'report', ttl: 3600))
->addNode('bust',  new CacheNode(operation: 'forget', cacheKey: 'report:{state.user_id}', stateKey: 'report'))

use Cainy\Laragraph\Nodes\NotifyNode;

->addNode('notify', new NotifyNode(
    eventClass: ReportReady::class,
    dataKeys:   ['user_id', 'report_url'],
))

use Cainy\Laragraph\Integrations\Prism\PrismNode;
use Prism\Prism\Enums\Provider;

$this->addNode('assistant', new PrismNode(
    provider:     Provider::Anthropic,
    model:        'claude-sonnet-4-6',
    systemPrompt: 'You are a helpful assistant.',
    maxTokens:    1024,
));

use Prism\Prism\Contracts\Schema;
use Prism\Prism\Schema\ObjectSchema;
use Prism\Prism\Schema\StringSchema;

class ClassifierNode extends PrismNode
{
    protected function systemPrompt(array $state): string
    {
        return 'Classify the input into category + confidence.';
    }

    protected function prompt(array $state): string
    {
        return "Input: {$state['text']}";
    }

    protected function schema(): ?Schema
    {
        return new ObjectSchema(
            name: 'classification',
            description: 'Result of the classification',
            properties: [
                new StringSchema('category', 'category'),
                new StringSchema('confidence', 'confidence 0..1'),
            ],
            

use Cainy\Laragraph\Integrations\Prism\PrismToolNode;
use Prism\Prism\Tool;

class WeatherAgent extends PrismToolNode
{
    public function tools(): array
    {
        return [
            (new Tool)
                ->as('get_weather')
                ->for('Get weather for a city')
                ->withStringParameter('city', 'City name')
                ->using(fn(string $city): string => "Sunny, 22°C in {$city}"),
        ];
    }
}

$this->addNode('agent', new WeatherAgent(
    provider: Provider::Anthropic,
    model:    'claude-sonnet-4-6',
));

->interruptBefore(Workflow::toolNode('agent'))

use Cainy\Laragraph\Integrations\Prism\ToolNode;

class WeatherToolNode extends ToolNode
{
    protected function toolMap(): array
    {
        return [
            'get_weather' => fn(array $args): string =>
                "Sunny, 22°C in " . ($args['city'] ?? 'unknown'),
        ];
    }
}

$this->addNode('agent', MyAgentNode::class)
     ->addNode('tools', WeatherToolNode::class)
     ->transition(Workflow::START, 'agent')
     ->transition('agent', 'tools', fn($s) => ! empty(end($s['messages'])['tool_calls'] ?? []))
     ->transition('agent', Workflow::END, fn($s) => empty(end($s['messages'])['tool_calls'] ?? []))
     ->transition('tools', 'agent');

use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Integrations\LaravelAi\AsGraphNode;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Promptable;

class ResearchAgent implements Agent, Node
{
    use AsGraphNode, Promptable;

    public function instructions(): string
    {
        return 'You are a research assistant.';
    }

    protected function getAgentPrompt(): string
    {
        return 'Research: ' . ($this->state['topic'] ?? 'general');
    }
}

use Laravel\Ai\Contracts\HasStructuredOutput;
use Illuminate\Contracts\JsonSchema\JsonSchema;

class ClassifierAgent implements Agent, Node, HasStructuredOutput
{
    use AsGraphNode, Promptable;

    public function instructions(): string
    {
        return 'Classify the input into a category and confidence score.';
    }

    public function schema(JsonSchema $schema): array
    {
        return [
            'category'   => $schema->string()->

use Laravel\Ai\Contracts\HasTools;

class WeatherAgent implements Agent, Node, HasTools
{
    use AsGraphNode, Promptable;

    public function tools(): array { return [new GetWeather]; }
}

class ResearchSubgraph extends Workflow
{
    public function definition(): void
    {
        $this->addNode('search',  SearchNode::class)
             ->addNode('extract', ExtractNode::class)
             ->transition(Workflow::START, 'search')
             ->transition('search',  'extract')
             ->transition('extract', Workflow::END);
    }
}

class ParentPipeline extends Workflow
{
    public function definition(): void
    {
        $this->addNode('research', ResearchSubgraph::class)
             ->addNode('write',    WriteNode::class)
             ->transition(Workflow::START, 'research')
             ->transition('research', 'write')
             ->transition('write', Workflow::END);
    }
}

$run->parent;    // ?WorkflowRun
$run->children;  // Collection<WorkflowRun>

class ToleratingChildWorkflow extends Workflow
{
    public function shouldCascadeFailure(): bool
    {
        return false; // parent stays Paused; caller can decide what to do
    }

    public function definition(): void { /* ... */ }
}

public function handle(NodeExecutionContext $context, array $state): array
{
    $profileId = $context->parentMetadata()['profile_id'] ?? null;
    // ...
}

class MyPipeline extends Workflow
{
    public function definition(): void
    {
        $this->withRecursionLimit(500);
        // ...
    }
}

// config/laragraph.php
return [
    // Queue name for ExecuteNode jobs (overridden per-node via HasQueue)
    'queue' => env('LARAGRAPH_QUEUE', 'default'),

    // Queue connection (null = default connection)
    'connection' => env('LARAGRAPH_QUEUE_CONNECTION'),

    // Hold jobs until the wrapping transaction commits (enable if you call
    // Laragraph::run() inside your own DB transactions)
    'after_commit' => env('LARAGRAPH_AFTER_COMMIT', false),

    // Default max attempts per node (overridden per-node via HasRetryPolicy)
    'max_node_attempts' => 3,

    // Default node timeout in seconds
    'node_timeout' => 60,

    // Maximum node executions per run before RecursionLimitExceeded is thrown
    'recursion_limit' => 100,

    // Prune completed/failed runs older than this many days
    'prunable_after_days' => 30,

    // Default retry backoff settings (overridden per-node via HasRetryPolicy)
    'retry' => [
        'initial_interval' => 0.5,
        'backoff_factor'   => 2.0,
        'max_interval'     => 128.0,
        'jitter'           => true,
    ],

    'broadcasting' => [
        'enabled'        => env('LARAGRAPH_BROADCASTING_ENABLED', false),
        'channel_type'   => env('LARAGRAPH_CHANNEL_TYPE', 'private'),
        'channel_prefix' => env('LARAGRAPH_CHANNEL_PREFIX', 'workflow.'),
    ],
];

use Cainy\Laragraph\Facades\Laragraph;
use Cainy\Laragraph\Enums\RunStatus;

it('completes the pipeline', function () {
    $run = Laragraph::run(MyPipeline::class, ['input' => 'hello']);

    expect($run->fresh())
        ->status->toBe(RunStatus::Completed)
        ->state->toHaveKey('output');
});

use function Cainy\Laragraph\Tests\makeContext;

it('returns a summary mutation', function () {
    $node = new SummarizeNode();

    $mutation = $node->handle(
        makeContext(nodeName: 'summarize'),
        ['text' => 'Long article...'],
    );

    expect($mutation)->toHaveKey('summary');
});
bash
php artisan vendor:publish --tag="laragraph-migrations"
php artisan migrate
bash
php artisan vendor:publish --tag="laragraph-config"
bash
composer