1. Go to this page and download the library: Download kelemen/parallel library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
kelemen / parallel example snippets
#!/usr/bin/env php
class
// If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here.
$parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1);
// Add some tasks (only examples, not part of parallel)
// Task is defined by its name and by dependencies (optional)
// Dependencies ($runAfter = []) are tasks which have to be done before task can start
$parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']);
$parallel->addTask(new \Parallel\UsersTask('task:user'));
$parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']);
$parallel->addTask(new \Parallel\CategoriesTask('task:categories'));
// Some tasks can be too much resource expensive, so we can define how many tasks can run along this task.
// If we setup 0, this task will be run alone although we setup 5 as global max concurrent
$parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0);
// Run symfony application under hood
$parallel->runConsoleApp();
// Setup PSR logger
// ...
$parallel->setLogger($psrLogger);
$parallel->setAnalyzeDir(__DIR__ . '/../log');
class ImplementedSimpleTask extends SimpleTask
{
protected function processTask(InputInterface $input, OutputInterface $output): TaskResult
{
// Do some magic
return new SuccessResult();
}
}
class ImplementedProgressTask extends ProgressTask
{
protected function items(): iterable
{
return DB::table('users');
}
protected function itemsCount(): int
{
return DB::table('users')->count();
}
protected function processItem($item): TaskResult
{
// $item here is one record form users table
// It can be anything what is provided by items() method (array, object ...)
if (!$item['is_active']) {
return new SkipResult();
}
file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX);
return SuccessResult();
}
}
class ImplementedBatchProgressTask extends BatchProgressTask
{
protected function startup(): void
{
// Here you can prepare data
}
protected function shutdown(): void
{
// Here you can run cleanup
}
protected function items(int $processed): iterable
{
// Fetch data (eg. from database) by 500 and use offset
return DB::table('users')->limit(500)->offset($processed);
}
protected function itemsCount(): int
{
// Count ALL data that will be processed
return DB::table('users')->count();
}
protected function processItem($item): TaskResult
{
// $item here is one record form users table
if (!$item['is_active']) {
return new SkipResult();
}
return new SuccessResult([
'id' => $item['id'],
'name' => $item['name']
]);
}
protected function batch(array $items): void
{
DB::table('active_users')->insert($items);
}
}
namespace Parallel\TaskGenerator;
use Parallel\Task;
class CustomTaskGenerator implements TaskGenerator
{
private $name;
private $task;
private $runAfter = [];
private $maxConcurrentTasksCount;
private $chunkSize;
public function __construct(string $name, Task $task, int $chunksCount, array $runAfter = [], ?int $maxConcurrentTasksCount = null)
{
$this->name = $name;
$this->task = $task;
$this->chunkSize = $chunksCount;
$this->runAfter = $runAfter;
$this->maxConcurrentTasksCount = $maxConcurrentTasksCount;
}
public function getName(): string
{
return $this->name;
}
public function generateTasks(): array
{
$tasks = [];
for ($i = 1; $i <= $this->chunkSize; $i++) {
$tasks[] = new BaseGeneratedTask($this->task, $this->runAfter, $this->maxConcurrentTasksCount);
}
return $tasks;
}
}
$taskGenerator = new CustomTaskGenerator('custom', new \Parallel\SomeTask('task:custom'), 10, ['task:dependency'], 2);
$parallel->addTaskGenerator($taskGenerator);
public function generateTasks(): array
{
$tasks = [];
for ($i = 1; $i <= $this->chunkSize; $i++) {
$tasks[] = new BaseGeneratedTask($this->task, ['task:dependency'], $this->maxConcurrentTasksCount);
}
return $tasks;
}