PHP code example of pdffiller / qless-php

1. Go to this page and download the library: Download pdffiller/qless-php library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

pdffiller / qless-php example snippets


use Qless\Client;

// Connect to localhost
$client1 = new Client();

// Connect to somewhere else
$client2 = new Client('127.0.0.99:1234');

use Qless\Jobs\BaseJob;

class MyJobClass
{
    /**
     * @param BaseJob $job Is an instance of `Qless\Jobs\BaseJob` and provides access
     *                     to the payload data via `$job->getData()`, a means to cancel
     *                     the job (`$job->cancel()`), and more.
     */
    public function perform(BaseJob $job): void
    {
        // ...
        echo 'Perform ', $job->getId(), ' job', PHP_EOL;
        
        $job->complete();
    }
}

/**
 * This references a new or existing queue 'testing'.
 * @var \Qless\Queues\Queue $queue
 * @var \Qless\Client $client
 */
$queue = $client->queues['testing'];

// Let's add a job, with some data. Returns Job ID
$jid = $queue->put(MyJobClass::class, ['hello' => 'howdy']);
// $jid here is "696c752a706049cdb227a9fcfe9f681b"

/**
 * Now we can ask for a job.
 * @var \Qless\Jobs\BaseJob $job
 */
$job = $queue->pop();

// And we can do the work associated with it!
$job->perform();
// Perform 316eb06a30d24d66ad0d33361306a7a1 job

/**
 * Find an existing job by it's JID
 * @var string $jid 
 * @var \Qless\Client $client 
 */
$job = $client->jobs[$jid];

// query it to find out details about it:
$job->jid;          // the job id
$job->klass;        // the class of the job
$job->queue;        // the queue the job is in
$job->data;         // the data for the job
$job->history;      // the history of what has happened to the job so far
$job->dependencies; // the jids of other jobs that must complete before this one
$job->dependents;   // the jids of other jobs that depend on this one
$job->priority;     // the priority of this job
$job->worker;       // the internal worker name (usually consumer identifier)
$job->tags;         // array of tags for this job
$job->expires;      // when you must either check in with a heartbeat or turn it in as completed
$job->remaining;    // the number of retries remaining for this job
$job->retries;      // the number of retries originally requested
$job->tracked;      // is job flagged as important
$job->failed;       // is job flagged as failed

// there is a way to get seconds remaining before this job will timeout:
$job->ttl();

// you can also change the job in various ways:
$job->requeue('some_other_queue'); // move it to a new queue
$job->cancel();                    // cancel the job
$job->tag('foo');                  // add a tag
$job->untag('foo');                // remove a tag
$job->track();                     // start tracking current job
$job->untrack();                   // stop tracking current job

// The autoloader line is omitted

use Qless\Client;
use Qless\Jobs\Reservers\OrderedReserver;
use Qless\Workers\ForkingWorker;

// Create a client
$client = new Client();

// Get the queues you use.
//
// Create a job reserver; different reservers use different
// strategies for which order jobs are popped off of queues
$reserver = new OrderedReserver($client->queues, ['testing', 'testing-2', 'testing-3']);

$worker = new ForkingWorker($reserver, $client);
$worker->run();

 // The autoloader line is omitted
 
 use Qless\Client;
 use Qless\Jobs\Reservers\OrderedReserver;
 use Qless\Workers\SimpleWorker;
 
 // Create a client
 $client = new Client();
 
 // Get the queues you use.
 //
 // Create a job reserver; different reservers use different
 // strategies for which order jobs are popped off of queues
 $reserver = new OrderedReserver($client->queues, ['testing', 'testing-2', 'testing-3']);
 
 $worker = new SimpleWorker($reserver, $client);
 $worker->run();
 

use Qless\Jobs\Reservers\OrderedReserver;
use Qless\Workers\ForkingWorker;

/** 
 *  @var \Qless\Client $client 
 *  @var object $jobHandlerFactory is some complicated factory which knows how to create Job Handler   
 */
$jobHandler = $jobHandlerFactory->createJobHandler();

$reserver = new OrderedReserver($client->queues, 'my-queue');

$worker = new ForkingWorker($reserver, $client);
$worker->registerJobPerformHandler($jobHandler);

$worker->run();

/**
 * @var \Qless\Queues\Queue $queue
 * @var \Qless\Client $client
 */
$queue = $client->queues['cook'];

$jid = $queue->put(MakeStuffing::class, ['lots' => 'of butter']);

$queue->put(
    MakeTurkey::class,      // The class with the job perform method.
    ['with' => 'stuffing'], // An array of parameters for job.
    null,                   // The specified job id, if not a specified, a jid will be generated.
    null,                   // The specified delay to run job.
    null,                   // Number of retries allowed.
    null,                   // A greater priority will execute before jobs of lower priority.
    null,                   // An array of tags to add to the job.
    [$jid]                  // A list of JIDs this job must wait on before executing.
);

/** @var \Qless\Queues\Queue $queue */
$queue->put(MyJobClass::class, ['foo' => 'bar'], null, null, null, 10);

/** @var \Qless\Client $client */
$job = $client->jobs['0c53b0404c56012f69fa482a1427ab7d'];

// Now this will get popped before any job of lower priority.
$job->priority = 10;

/**
 * Run at least 10 minutes from now.
 *
 * @var \Qless\Queues\Queue $queue
 */
$queue->put(MyJobClass::class, ['foo' => 'bar'], null, 600);

/**
 * Run in 10 minutes.
 *
 * @var \Qless\Queues\Queue $queue
 */
$queue->put(MyJobClass::class, ['foo' => 'bar'], null, 600, null, 100);

/**
 * Run every hour.
 *
 * @var \Qless\Queues\Queue $queue
 */
$jid = $queue->recur(MyJobClass::class, ['widget' => 'warble'], 3600);
// $jid here is "696c752a706049cdb227a9fcfe9f681b"

/**
 * @var \Qless\Client $client
 * @var \Qless\Jobs\RecurringJob $job
 */
$job = $client->jobs['696c752a706049cdb227a9fcfe9f681b'];

/**
 * I think I only need it to run once every two hours.
 *
 * @var \Qless\Jobs\RecurringJob $job
 */
$job->interval = 7200;

/**
 * 23 minutes of waiting until it should go.
 *
 * @var \Qless\Queues\Queue $queue
 */
$queue->recur(MyJobClass::class, ['howdy' => 'hello'], 3600, 23 * 60);

/**
 * Recur every minute.
 *
 * @var \Qless\Queues\Queue $queue
 */
$queue->recur(MyJobClass::class, ['lots' => 'of jobs'], 60);

// Wait 5 minutes
$jobs = $queue->pop(null, 10);
echo count($jobs), ' jobs got popped'; // 5 jobs got popped

/**
 * Subscribe
 *
 * @var \Qless\Queues\Queue $queue1
 * @var \Qless\Queues\Queue $queue2
 * @var \Qless\Queues\Queue $queue3
 */
$queue1->subscribe('*.*.apples');
$queue2->subscribe('big.*.apples');
$queue3->subscribe('#.apples');


/**
 * Put to few queues
 *
 * @var \Qless\Topics\Topic
 * @var \Qless\Client $client
 */
use Qless\Topics\Topic;
 
$topic = new Topic('big.green.apples', $client);
$topic->put('ClassName', ['key' => 'value']); // Put to $queue1, $queue2 and $queue3


/** @var \Qless\Client $client */
$client->config['jobs-history'] = 7 * 86400;
$client->config['jobs-history-count'] = 500;
$client->config['jobs-failed-history'] = 3 * 86400;

/** @var \Qless\Client $client */
$client->jobs['b1882e009a3d11e192d0b174d751779d']->track();

/**  @var \Qless\Queues\Queue $queue */
$queue->put(MyJobClass::class, ['tags' => 'aplenty'], null, null, null, null, ['12345', 'foo', 'bar']);

/** @var \Qless\Client $client */
$jids = $client->jobs->tagged('foo');

/**
 * @var \Qless\Client $client
 * @var \Qless\Jobs\BaseJob $job
 */
$job = $client->jobs['b1882e009a3d11e192d0b174d751779d'];
$job->tag('howdy', 'hello');
$job->untag('foo', 'bar');

/**
 * @var \Qless\Client $client
 * @var \Qless\PubSub\Manager $events
 */
$events = $client->events;
$events->on(
    Qless\PubSub\Manager::EVENT_COMPLETED,
    function (string $jid) {
        echo "{$jid} completed";
    }
);

$events->listen();

/**
 * @var \Qless\Client $client
 * @var \Qless\PubSub\Manager $events
 */
$events = $client->events;
$events->on(
    Qless\PubSub\Manager::EVENT_FAILED,
    function (string $jid) use ($client) {
        echo "{$jid} failed in {$client->jobs[$jid]->queue}";
    }
);

$events->listen();

use Acme\Database\Connection;
use Qless\Events\User\AbstractEvent;

class ReEstablishDBConnection
{
    private $connection;

    public function __construct(Connection $connection)
    {
        $this->connection = $connection;
    }

    public function beforeFork(AbstractEvent $event): void
    {
        $this->connection->connect();
    }
}

use Qless\Events\User\Worker\AbstractWorkerEvent;

/** @var \Qless\Workers\ForkingWorker $worker */
$worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new ReEstablishDBConnection());

use \Qless\Events\User\Worker\BeforeFork;

/** @var \Qless\Workers\ForkingWorker $worker */
$worker->getEventsManager()->attach(BeforeFork::getName(), new ReEstablishDBConnection());

use Qless\Events\User\Worker\AbstractWorkerEvent;

/** @var \Qless\Workers\ForkingWorker $worker */
$worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber1(), 150); // More priority
$worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber2(), 100); // Normal priority
$worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber10(), 50); // Less priority

use Qless\Events\User\Job\BeforePerform;
use Qless\Jobs\BaseJob;
use Qless\Jobs\PerformAwareInterface;
use My\Database\Connection;

class ReEstablishDBConnection
{
    private $connection;

    public function __construct(Connection $connection)
    {
        $this->connection = $connection;
    }

    /**
     * @param BeforePerform $event
     * @param BaseJob|PerformAwareInterface $source
     */
    public function beforePerform(BeforePerform $event, $source): void
    {
        $this->connection->connect();
    }
}

use Qless\Events\User\Job\AbstractJobEvent;
use Qless\Jobs\BaseJob;
use Qless\EventsManagerAwareInterface;
use Qless\EventsManagerAwareTrait;

class EventsDrivenJobHandler implements EventsManagerAwareInterface
{
    use EventsManagerAwareTrait;

    public function setUp()
    {
        $this->getEventsManager()->attach(AbstractJobEvent::getEntityName(), new ReEstablishDBConnection());
    }

    public function perform(BaseJob $job): void
    {
        // ...

        $job->complete();
    }
}

use Qless\Events\User\Queue\BeforeEnqueue;

/** @var \Qless\Client $client */
$client
    ->getEventsManager()
    ->attach('queue:beforeEnqueue', function (BeforeEnqueue $event) {
        $event->getData()['metadata'] = [
            'server_address' => $_SERVER['SERVER_ADDR'],
        ];
    });

/** @var \Qless\Client $client */
$client->config->set('sync-enabled', true);
bash
composer 
bash
git clone git://github.com/pdffiller/qless-php.git
cd qless-php
composer update
 php
// Set 10 minutes
$client->config->set('heartbeat', 600);
 php
$client->queues['test-queue']->heartbeat = 120;
 php
// automatically
$queue->put($className, $data);

// manually
$queue->put($className, $data, 'abcdef123456');
 php
$queue->put(
    Job::class,             // Class - custom-job-id',        // Manually id
    10,                     // Delay 10 seconds
    3,                      // Three retries
    7,                      // Priority
    ['important', 'media'], // Tags
    [$jidFirst, $jidSecond] // Depends jobs
);