PHP code example of gyselroth / mongodb-php-task-scheduler

1. Go to this page and download the library: Download gyselroth/mongodb-php-task-scheduler library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

gyselroth / mongodb-php-task-scheduler example snippets


class MailJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $transport = new Zend\Mail\Transport\Sendmail();
        $mail = Message::fromString($this->data);
        $this->transport->send($mail);

        return true;
    }
}

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

$mail = new Message();
$mail->setSubject('Hello...');
$mail->setBody('World');
$mail->setFrom('root@localhost', 'root');

$scheduler->addJob(MailJob::class, $mail->toString());

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $logger = new \A\Psr4\Compatible\Logger();
        return new TaskScheduler\WorkerManager($this, $logger);
    }
}

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$worker_factory = My\App\WorkerFactory(); #An instance of our previously created worker factory
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);

$queue->process();

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->getJobs();

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->getJobs([
    'status' => TaskScheduler\JobInterface::STATUS_DONE,
    '$or' => [
        ['class' => 'MyApp\\MyTask1'],
        ['class' => 'MyApp\\MyTask2'],
    ]
]);

foreach($jobs as $job) {
    echo $job->getId()." done\n";
}

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->cancelJob(MongoDB\BSON\ObjectId $job_id);

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->flush();

class MailJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $transport = new Zend\Mail\Transport\Sendmail();
        $mail = Message::fromString($this->data);
        $this->transport->send($mail);
        throw new \Exception('i am an exception');
        return true;
    }
}

$scheduler->addJob(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_RETRY => 5,
    TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 30,
]);

class CopyFileJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $source = $this->data['source'];
        $dest = $this->data['destination'];

        $size = filesize($source);
        $f = fopen($source, 'r');
        $t = fopen($dest, 'w');
        $read = 0;

        while($chunk = fread($f, 4096)) {
            $read += fwrite($t, $chunk);
            $this->updateProgress($read/$size*100);
        }
    }
}

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$p = $scheduler->getJob(MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69'));
$p->getProgress();

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->addJob(MyTask::class, 'foobar')
  ->wait();

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

$scheduler->waitFor($stack);

//some other important stuff here

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = $scheduler->getJobs([
    '_id' => ['$in' => $job_ids_from_http_request]
]);

$scheduler->waitFor(iterator_to_array($stack));

//do stuff

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

try {
    $scheduler->waitFor($stack, Scheduler::OPTION_THROW_EXCEPTION);
} catch(\Exception $e) {
    //error handling
}

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->listen(function(TaskScheduler\Process $process) {
    echo "status of ".$process->getId().' change to '.$process->getStatus();
});

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->listen(function(TaskScheduler\Process $process) {
    echo "status of ".$process->getId().' change to '.$process->getStatus();
}, [
    '_id' => new MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69')
]);

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

$scheduler->on('waiting', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is waiting';
})->on('done', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is finished';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is '.$p->getStats();
});

$scheduler->waitFor($stack);

$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);

$queue->on('timeout', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is timed out';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is '.$p->getStats();
});

$queue->process();

$emitter = new League\Event\Emitter();

//Queue
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger, $emitter);

//Scheduler
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, [], $emitter);

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

$mail = new Message();
$mail->setSubject('Hello...');
$mail->setBody('World');
$mail->setFrom('root@localhost', 'root');

$scheduler->addJob(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_AT => time()+3600,
    TaskScheduler\Scheduler::OPTION_RETRY => 3,
    TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 60,
]);

$scheduler->addJobOnce(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_AT => time()+3600,
    TaskScheduler\Scheduler::OPTION_RETRY => 3,
]);

$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 3600], [
    TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true,
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 1800], [
    TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true,
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

$jobs = $scheduler->getJobs([
    'class' => MyApp\CleanTemp::class,
    'status' => ['$lte' => TaskScheduler\JobInterface::STATUS_PROCESSING]
]);

foreach($jobs as $job) {
    $scheduler->cancelJob($job->getId());
}

$scheduler->addJob(MyApp\CleanTemp::class, ['max_age' => 1800], [
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, null, [
    TaskScheduler\Scheduler::OPTION_JOB_QUEUE_SIZE => 1000000000,
    TaskScheduler\Scheduler::OPTION_EVENT_QUEUE_SIZE => 5000000000,
    TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 3
]);

$scheduler->setOptions([
    TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 2
]);

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $logger = new \A\Psr4\Compatible\Logger();
        return new TaskScheduler\WorkerManager($this, $logger, [
            TaskScheduler\WorkerManager::OPTION_MIN_CHILDREN => 10,
            TaskScheduler\WorkerManager::OPTION_PM => 'static' 
        ]);
    }
}

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function build(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
        $dic = new \A\Psr11\Compatible\Dic();

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger, $dic);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $dic = new \A\Psr11\Compatible\Dic();
        return $dic->get(TaskScheduler\WorkerManager::class);
    }
}

composer 
Dockerfile
FROM php:7.4
RUN docker-php-ext-install pcntl sysvmsg
RUN pecl install mongodb && docker-php-ext-enable mongodb pcntl sysvmsg