Download the PHP package solophp/job-queue without Composer
On this page you can find all versions of the php package solophp/job-queue. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Informations about the package job-queue
JobQueue
A small, production-ready, database-backed job queue for PHP.
Correctness-first: atomic claim, visibility timeout for dead workers, exponential backoff on failures, no long-lived transactions. Small surface area, no framework lock-in.
Features
- Atomic claim via
FOR UPDATE SKIP LOCKED(MySQL 8+, MariaDB 10.6+, PostgreSQL 9.5+); on SQLite the single-writer model provides the same guarantee. - Visibility timeout — jobs whose worker died (crashed, OOM-killed, timed out) are automatically returned to the queue.
- Exponential backoff — retries are rescheduled with
baseDelay * 2^retry_count, capped at 1 hour. - No long transactions —
handle()runs outside any transaction, so external side-effects cannot be rolled back. - Typed jobs via
JobInterface+ PSR-11 dependency injection throughcreateFromContainer. - UTC timestamps — all DB times written in UTC regardless of the PHP default timezone.
- PSR-3 logging with full-fidelity error context — Throwables are forwarded under the PSR-3
exceptionkey (trace + previous chain preserved for structured log sinks). - Observability hooks via
JobQueueListener—onClaimed/onCompleted/onFailed/onReclaimedevents for metrics, tracing, or context enrichment. - Long-running worker with signal-driven graceful shutdown and max-jobs / max-runtime / max-memory limits.
- Operational API —
reclaimStuck()(callable from a separate cron),getFailedJobs(),retry(). - Optional
deleteOnSuccessandLockGuardfor preventing overlapping workers.
Requirements
- PHP >= 8.3
ext-jsondoctrine/dbal^4- One of: MySQL 8+, MariaDB 10.6+, PostgreSQL 9.5+, SQLite 3.35+
Installation
Setup
Defining a job
Every job implements Solo\Contracts\JobQueue\JobInterface and provides a static createFromContainer(ContainerInterface $container, array $data): self factory. This lets the queue persist only job data (not services) and wire up dependencies at run time.
Pushing jobs
Monitoring progress
getStats() is the recommended way to track batch progress: push N jobs with a shared type, then poll getStats($type). No batch table, no extra schema — the type column is the grouping key.
Processing jobs
What processJobs() does, in order:
- Reclaim stuck jobs — any job stuck in
in_progresslonger thanlockTimeoutis returned topending(or markedfailedif retries are exhausted). - Atomic claim — a short transaction selects up to
$limitpending jobs withFOR UPDATE SKIP LOCKEDand marks themin_progress. Other workers skip these rows immediately. - Run each job outside any transaction —
handle()executes, then a singleUPDATEmarks the jobcompletedorfailed. Side-effects are never rolled back.
On a thrown exception the job is rescheduled with exponential backoff. After maxRetries attempts it becomes failed permanently.
Delivery semantics: at-least-once
Jobs can run more than once. handle() runs outside any transaction, and markCompleted() is a separate statement. If a worker crashes (or the DB connection drops) after handle() succeeds but before markCompleted() finishes, the job stays in_progress. Once lockTimeout elapses, reclaimStuck returns it to the queue and another worker runs handle() again.
This is the standard at-least-once guarantee. Make every handler idempotent — external side effects (emails sent, API calls, row inserts) must tolerate being repeated:
- Use deduplication keys on outbound APIs (idempotency tokens).
- Prefer
INSERT ... ON CONFLICT/ upserts over raw inserts. - For emails, check a "sent" flag before sending.
- For financial operations, use transaction references.
If you cannot make a handler idempotent, keep side effects minimal and accept the trade-off.
Timezones
All internal timestamps (scheduled_at, locked_at, expires_at, created_at defaults) are written in UTC regardless of the PHP default timezone. Compare timestamps in UTC on the database side too. DateTimeImmutable arguments you pass to push()/addJob() are converted to UTC internally.
Long-running worker
For production, prefer the Worker daemon over a cron tick. It blocks on the queue, processes batches, sleeps when empty, and shuts down cleanly on SIGTERM / SIGINT so jobs in flight finish before the process exits. Restart limits (max-jobs / max-runtime / max-memory) let a supervisor (systemd, Supervisord, K8s) recycle the process to release memory and pick up fresh code.
If the pcntl extension is unavailable, signal handlers are silently skipped — exit relies on limits or stop().
Observability hooks
Implement Solo\Contracts\JobQueue\JobQueueListener to ship metrics, open tracing spans, or enrich your log context per job:
Listener exceptions propagate to the caller, so keep handlers cheap and non-throwing.
Operational API
Preventing overlapping workers
Use LockGuard in cron-driven workers to avoid two instances of the same script running concurrently. It uses flock() — atomic, cross-platform, auto-released by the OS on process exit.
Integration with Async Event-Dispatcher
JobQueue integrates with SoloPHP Async Event-Dispatcher:
API
| Method | Description |
|---|---|
Schema::install($connection, $table = 'jobs') |
Create the jobs table (idempotent). |
push(JobInterface $job, ?string $type = null, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null): int |
Enqueue a typed job. |
pushMany(JobInterface[] $jobs, ?string $type = null, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null): int |
Bulk-insert many jobs sharing the same type/schedule in a single statement. Returns the number of rows inserted. |
addJob(array $payload, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null, ?string $type = null): int |
Enqueue a raw payload (must contain job_class). |
processJobs(int $limit = 10, ?string $onlyType = null): int |
Reclaim stuck jobs (unless autoReclaim = false), then claim and run pending jobs. Returns how many were executed. |
reclaimStuck(): array{requeued: int, failed: int} |
Return stuck jobs to pending or mark them failed. Safe to call from a separate cron. |
getPendingJobs(int $limit = 10, ?string $onlyType = null): array |
Informational read of pending jobs. Does not lock rows. |
getFailedJobs(int $limit = 50, ?string $type = null): array |
Inspect permanently-failed jobs (most recent first). |
getStats(?string $type = null): array{pending: int, in_progress: int, completed: int, failed: int} |
Counts grouped by status, optionally filtered by type. All four keys always present. |
retry(int $jobId): bool |
Re-queue a failed job: status → pending, retry_count → 0, scheduled_at → now, error cleared. Returns true if a row was updated. |
markCompleted(int $jobId): void |
Mark a job completed (or delete it if deleteOnSuccess = true). |
markFailed(int $jobId, Throwable\|string $error = ''): void |
Record failure; reschedules with backoff or marks failed if retries exhausted. Passing a Throwable captures class/file/line in the DB error column and forwards the exception to the logger under the PSR-3 exception key. |
Testing
Tests run against SQLite in-memory, covering atomic claim, retry/backoff, visibility timeout, type filtering, expiry, DI wiring, validation branches and logger integration. Schema SQL for MySQL and PostgreSQL is tested via mocked connections.
License
MIT — see LICENSE.
All versions of job-queue with dependencies
ext-json Version *
doctrine/dbal Version ^4.0
solophp/contracts Version ^1.4
psr/container Version ^1.0|^2.0
psr/log Version ^1.0|^2.0|^3.0