PHP code example of wapmorgan / threadable

1. Go to this page and download the library: Download wapmorgan/threadable library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

wapmorgan / threadable example snippets


use wapmorgan\Threadable\Worker;
class SleepingWorker extends Worker
{
    public function onPayload($data)
    {
        echo 'I have started at '.date('r').PHP_EOL;
        sleep(3);
        echo 'I have ended at '.date('r').PHP_EOL;
        
        return true;
    }
}

use wapmorgan\Threadable\BackgroundWork;
use wapmorgan\Threadable\DownloadWorker;
use wapmorgan\Threadable\Worker;

$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
$files = [];
foreach ($file_sources as $file_to_download) {
    $files[] = [
        'source' => $file_to_download,
        'size' => DownloadWorker::getRemoteFileSize($file_to_download),
        'target' => tempnam(sys_get_temp_dir(), 'thrd_test'),
    ];
}

$result = BackgroundWork::doInBackground(new DownloadWorker(), $files,
    function (Worker $worker, $payloadI, $payloadData) {
        clearstatcache(true, $payloadData['target']);
        echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' downloading ' . round(filesize($payloadData['target']) * 100 / $payloadData['size'], 2) . '%';
    },
    function (Worker $worker, $payloadI, $payloadData, $payloadResult) {
        echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' successfully downloaded' . PHP_EOL;
        return true;
    }
);
if ($result)
    echo 'All files downloaded successfully'.PHP_EOL;

// Implement class-downloader
class DownloadWorker extends Worker
{
    public function onPayload($data)
    {
        echo 'Started '.$data[0].' into '.$data[2].PHP_EOL;
        copy($data[0], $data[2]);
    }
}

// supplementary function, just to avoid hand-writing of file sizes
function remote_filesize($path)
{
    $fp = fopen($path, 'r');
    $inf = stream_get_meta_data($fp);
    fclose($fp);
    foreach($inf["wrapper_data"] as $v) {
        if (stristr($v,"content-length")) {
            $v = explode(":",$v);
            return (int)trim($v[1]);
        }
    }
}

// our function to print actual status of downloads
function show_status(&$files)
{
    foreach ($files as $i => $file) {
        if (file_exists($file[2])) {
            clearstatcache(true, $file[2]);
            $downloaded_size = filesize($file[2]);
            if ($downloaded_size == $file[1]) {
                echo $file[0].' downloaded'.PHP_EOL;
                unset($files[$i]);
                unlink($file[2]);
            } else if ($downloaded_size === 0) {
                // echo $file[0].' in queue'.PHP_EOL;
            } else  {
                echo $file[0].' downloading '.round($downloaded_size * 100 / $file[1], 2).'%'.PHP_EOL;
            }
        }
    }
}

// list of files to be downloaded
$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
// process of remote file size detection and creation temp local file for this downloading
$files = [];
foreach ($file_sources as $file_to_download) {
    $file_size = remote_filesize($file_to_download);
    $output = tempnam(sys_get_temp_dir(), 'thrd_test');
    $files[] = [$file_to_download, $file_size, $output];
}

// construct and start new worker
$worker = new DownloadWorker();
// or if you want to simulate forking
$worker = new DownloadWorker(true);

// add files to work queue
foreach ($files as $file) {
    echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
    $worker->sendPayload([$file]);
}

// main worker thread loop
while ($worker->state !== Worker::TERMINATED) {
    // Worker::RUNNING state indicates that worker thread is still working over some payload
    if ($worker->state == Worker::RUNNING) {

        // prints status of all files
        show_status($files);
        // call check for finishing all tasks
        $worker->checkForFinish();
        usleep(500000);
    }
    // Worker::IDLE state indicates that worker thread does not have any work right now
    else if ($worker->state == Worker::IDLE) {
        echo 'Ended. Stopping worker...'.PHP_EOL;
        // we don't need worker anymore, just stop it
        $worker->stop();
        usleep(500000);
    }
    // Worker::TERMINATING state indicates that worker thread is going to be stopped and can't be used to process data
    else if ($worker->state == Worker::TERMINATING) {
        echo 'Wait for terminating ...'.PHP_EOL;
        // just to set Worker::TERMINATED state
        $worker->checkForTermination();
        usleep(500000);
    }
}

// loops works only when worker is running.
// just to show information about downloaded files
while ($worker->state == Worker::RUNNING) {
    show_status($files);
    $worker->checkForFinish();
    usleep(500000);
}
// when thread is in idle state, just stop right now (`true` as 1st argument forces it to send stop command and wait it termination).
$worker->stop(true);

// ...
$file_sources = ['http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip', 'http://soft.eurodir.ru/test-speed-100Mb.bin'];
// ...

// create pool with downloading-workers
$pool = new WorkersPool('DownloadWorker');
/**
 * Also, you can create pool out of object:
 * $pool = new WorkersPool(new DownloadWorker());
 * This is useful, when you open shared sources within worker constructor so all workers can use them.
 */
// use only 2 workers (this is enough for our work)
$pool->setPoolSize(2);

// dispatch payload to workers. Notice! WorkersPool uses sendData() method instead of sendPayload().
foreach ($files as $file) {
    echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
    $pool->sendData($file);
}

// register tracker, which should be launched every 0.5 seconds.
// This method will hold the execution until all workers finish their work and go in Worker::IDLE state
$pool->waitToFinish([
    '0.5' => function ($pool) use (&$files) {
        show_status($files);
    }]
);