1. Go to this page and download the library: Download webpatser/torque library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
ProcessDocument::dispatch($document);
ProcessDocument::dispatch($document)->onQueue('high');
ProcessDocument::dispatch($document)->delay(now()->addMinutes(5));
// Batches work out of the box
Bus::batch([
new ProcessDocument($doc1),
new ProcessDocument($doc2),
new ProcessDocument($doc3),
])->dispatch();
use Webpatser\Torque\Job\TorqueJob;
use Webpatser\Torque\Pool\MysqlPool;
use Webpatser\Torque\Pool\HttpPool;
class IndexDocument extends TorqueJob
{
public function __construct(
private int $documentId,
) {}
public function handle(MysqlPool $db, HttpPool $http): void
{
$result = $db->execute('SELECT * FROM documents WHERE id = ?', [$this->documentId]);
$row = $result->fetchRow();
$http->post('http://elasticsearch:9200/docs/_doc/' . $this->documentId, json_encode($row));
}
}
// ❌ BAD: blocks the OS thread, every Fiber on this worker waits
public function handle(): void
{
$user = User::find($this->userId);
foreach ($user->subscriptions as $sub) {
Http::post($sub->webhook_url, $payload);
}
}
// ✅ GOOD (option 1): pre-fetch in dispatch, pass plain data into the handler
ProcessWebhooks::dispatch(
userId: $user->id,
webhooks: $user->subscriptions->pluck('webhook_url')->all(),
);
// ✅ GOOD (option 2): extend TorqueJob, use MysqlPool for async queries
class ProcessWebhooks extends TorqueJob
{
public function handle(MysqlPool $db, HttpPool $http): void
{
$rows = $db->query('SELECT webhook_url FROM subscriptions WHERE user_id = ?', [$this->userId]);
foreach ($rows as $row) {
$http->post($row['webhook_url'], $this->payload);
}
}
}
use Webpatser\Torque\Job\CoroutineContext;
// Inside a job handler
CoroutineContext::set('tenant_id', $this->tenantId);
$tenantId = CoroutineContext::get('tenant_id');
use Webpatser\Torque\Stream\Streamable;
class ImportCsv implements ShouldQueue
{
use Streamable;
public function handle(): void
{
foreach ($this->rows as $i => $row) {
// process...
$this->emit("Imported row {$i}", progress: $i / count($this->rows));
}
}
}
use Webpatser\Torque\Stream\JobStream;
$stream = app(JobStream::class);
// All events so far
$events = $stream->events($uuid);
// Tail (blocks, yields events as they arrive)
foreach ($stream->tail($uuid) as $event) {
echo $event['type'] . ': ' . ($event['data']['message'] ?? '');
}
// Check completion
$stream->isFinished($uuid); // true after completed/failed
'autoscale' => [
'enabled' => true,
'min_workers' => 2,
'max_workers' => 8,
'scale_up_threshold' => 0.85, // Scale up when 85% of slots are busy
'scale_down_threshold' => 0.20, // Scale down when 20% of slots are busy
'cooldown' => 30, // Seconds between scaling decisions
],
use Webpatser\Torque\Events\JobPermanentlyFailed;
Event::listen(JobPermanentlyFailed::class, function ($event) {
// $event->jobName, $event->queue, $event->exceptionMessage, etc.
Notification::route('slack', '#alerts')->notify(new YourNotification($event));
});
bash
pecl install igbinary
echo "extension=igbinary" | sudo tee -a /etc/php/8.5/cli/php.ini
echo "extension=igbinary" | sudo tee -a /etc/php/8.5/fpm/php.ini