PHP code example of webpatser / torque

1. Go to this page and download the library: Download webpatser/torque library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

webpatser / torque example snippets


'torque' => [
    'driver' => 'torque',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 2000,
    'prefix' => 'torque:',
    'redis_uri' => env('TORQUE_REDIS_URI', 'redis://127.0.0.1:6379'),
    'consumer_group' => 'torque',
],

ProcessDocument::dispatch($document);
ProcessDocument::dispatch($document)->onQueue('high');
ProcessDocument::dispatch($document)->delay(now()->addMinutes(5));

// Batches work out of the box
Bus::batch([
    new ProcessDocument($doc1),
    new ProcessDocument($doc2),
    new ProcessDocument($doc3),
])->dispatch();

use Webpatser\Torque\Job\TorqueJob;
use Webpatser\Torque\Pool\MysqlPool;
use Webpatser\Torque\Pool\HttpPool;

class IndexDocument extends TorqueJob
{
    public function __construct(
        private int $documentId,
    ) {}

    public function handle(MysqlPool $db, HttpPool $http): void
    {
        $result = $db->execute('SELECT * FROM documents WHERE id = ?', [$this->documentId]);
        $row = $result->fetchRow();

        $http->post('http://elasticsearch:9200/docs/_doc/' . $this->documentId, json_encode($row));
    }
}

// ❌ BAD: blocks the OS thread, every Fiber on this worker waits
public function handle(): void
{
    $user = User::find($this->userId);
    foreach ($user->subscriptions as $sub) {
        Http::post($sub->webhook_url, $payload);
    }
}

// ✅ GOOD (option 1): pre-fetch in dispatch, pass plain data into the handler
ProcessWebhooks::dispatch(
    userId: $user->id,
    webhooks: $user->subscriptions->pluck('webhook_url')->all(),
);

// ✅ GOOD (option 2): extend TorqueJob, use MysqlPool for async queries
class ProcessWebhooks extends TorqueJob
{
    public function handle(MysqlPool $db, HttpPool $http): void
    {
        $rows = $db->query('SELECT webhook_url FROM subscriptions WHERE user_id = ?', [$this->userId]);
        foreach ($rows as $row) {
            $http->post($row['webhook_url'], $this->payload);
        }
    }
}

use Webpatser\Torque\Job\CoroutineContext;

// Inside a job handler
CoroutineContext::set('tenant_id', $this->tenantId);
$tenantId = CoroutineContext::get('tenant_id');

use Webpatser\Torque\Stream\Streamable;

class ImportCsv implements ShouldQueue
{
    use Streamable;

    public function handle(): void
    {
        foreach ($this->rows as $i => $row) {
            // process...
            $this->emit("Imported row {$i}", progress: $i / count($this->rows));
        }
    }
}

use Webpatser\Torque\Stream\JobStream;

$stream = app(JobStream::class);

// All events so far
$events = $stream->events($uuid);

// Tail (blocks, yields events as they arrive)
foreach ($stream->tail($uuid) as $event) {
    echo $event['type'] . ': ' . ($event['data']['message'] ?? '');
}

// Check completion
$stream->isFinished($uuid); // true after completed/failed

'autoscale' => [
    'enabled' => true,
    'min_workers' => 2,
    'max_workers' => 8,
    'scale_up_threshold' => 0.85,   // Scale up when 85% of slots are busy
    'scale_down_threshold' => 0.20, // Scale down when 20% of slots are busy
    'cooldown' => 30,               // Seconds between scaling decisions
],

'pools' => [
    'redis' => ['size' => 30, 'idle_timeout' => 60],
    'mysql' => ['size' => 20, 'idle_timeout' => 60],
    'http'  => ['size' => 15, 'idle_timeout' => 30],
],

Gate::define('viewTorque', fn (User $user) => $user->isAdmin());

'dashboard' => [
    'enabled' => true,
    'path' => 'torque',
    'middleware' => ['web', 'auth', 'can:admin'],
],

use Webpatser\Torque\Events\JobPermanentlyFailed;

Event::listen(JobPermanentlyFailed::class, function ($event) {
    // $event->jobName, $event->queue, $event->exceptionMessage, etc.
    Notification::route('slack', '#alerts')->notify(new YourNotification($event));
});
bash
php artisan vendor:publish --tag=torque-config
bash
php artisan torque:start
bash
php artisan torque:start --workers=8 --concurrency=100 --queues=emails,notifications
css
   @source '../../vendor/webpatser/torque/src/Dashboard/resources/views/**/*.php';
   
bash
php artisan torque:supervisor --workers=4 --user=forge
bash
pecl install igbinary
echo "extension=igbinary" | sudo tee -a /etc/php/8.5/cli/php.ini
echo "extension=igbinary" | sudo tee -a /etc/php/8.5/fpm/php.ini
bash
php artisan torque:bench --workload=payload-large --serializer=igbinary