PHP code example of kelemen / parallel

1. Go to this page and download the library: Download kelemen/parallel library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

kelemen / parallel example snippets


#!/usr/bin/env php


class
// If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here.
$parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1);

// Add some tasks (only examples, not part of parallel)
// Task is defined by its name and by dependencies (optional)
// Dependencies ($runAfter = []) are tasks which have to be done before task can start
$parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']);
$parallel->addTask(new \Parallel\UsersTask('task:user'));
$parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']);
$parallel->addTask(new \Parallel\CategoriesTask('task:categories'));

// Some tasks can be too much resource expensive, so we can define how many tasks can run along this task.
// If we setup 0, this task will be run alone although we setup 5 as global max concurrent
$parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0);

// Run symfony application under hood
$parallel->runConsoleApp();



// Setup PSR logger
// ...
$parallel->setLogger($psrLogger);


$parallel->setAnalyzeDir(__DIR__ . '/../log');


class ImplementedSimpleTask extends SimpleTask
{
    protected function processTask(InputInterface $input, OutputInterface $output): TaskResult
    {
        // Do some magic
        return new SuccessResult();
    }
}


class ImplementedProgressTask extends ProgressTask
{
    protected function items(): iterable
    {
        return DB::table('users');
    }

    protected function itemsCount(): int
    {
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        // It can be anything what is provided by items() method (array, object ...)
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX);
        return SuccessResult();
    }
}


class ImplementedBatchProgressTask extends BatchProgressTask
{
    protected function startup(): void
    {
        // Here you can prepare data
    }

    protected function shutdown(): void
    {
        // Here you can run cleanup
    }

    protected function items(int $processed): iterable
    {
        // Fetch data (eg. from database) by 500 and use offset
        return DB::table('users')->limit(500)->offset($processed);
    }

    protected function itemsCount(): int
    {
        // Count ALL data that will be processed
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        return new SuccessResult([
            'id' => $item['id'],
            'name' => $item['name']
        ]);
    }

    protected function batch(array $items): void
    {
        DB::table('active_users')->insert($items);
    }
}


namespace Parallel\TaskGenerator;

use Parallel\Task;

class CustomTaskGenerator implements TaskGenerator
{
private $name;
private $task;
private $runAfter = [];
private $maxConcurrentTasksCount;
private $chunkSize;

    public function __construct(string $name, Task $task, int $chunksCount, array $runAfter = [], ?int $maxConcurrentTasksCount = null)
    {
        $this->name = $name;
        $this->task = $task;
        $this->chunkSize = $chunksCount;
        $this->runAfter = $runAfter;
        $this->maxConcurrentTasksCount = $maxConcurrentTasksCount;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function generateTasks(): array
    {
        $tasks = [];
        for ($i = 1; $i <= $this->chunkSize; $i++) {
            $tasks[] = new BaseGeneratedTask($this->task, $this->runAfter, $this->maxConcurrentTasksCount);
        }
        return $tasks;
    }
}

$taskGenerator = new CustomTaskGenerator('custom', new \Parallel\SomeTask('task:custom'), 10, ['task:dependency'], 2);
$parallel->addTaskGenerator($taskGenerator);

public function generateTasks(): array
{
    $tasks = [];
    for ($i = 1; $i <= $this->chunkSize; $i++) {
        $tasks[] = new BaseGeneratedTask($this->task, ['task:dependency'], $this->maxConcurrentTasksCount);
    }
    return $tasks;
}

$parallel->addTask(new \Parallel\SomeOtherTask('task:afterGenerated'), ['custom']);
sh
php command.php parallel:run
sh
# Now you can run
php command.php analyze:graph