PHP code example of clue / reactphp-flux

1. Go to this page and download the library: Download clue/reactphp-flux library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

clue / reactphp-flux example snippets




ser = new React\Http\Browser();

$concurrency = isset($argv[1]) ? $argv[1] : 3;

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
    // skip users that do not have an IP address listed
    if (!isset($user['ip'])) {
        return React\Promise\resolve($user);
    }

    // look up country for this IP
    return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
        function (Psr\Http\Message\ResponseInterface $response) use ($user) {
            // response successfully received
            // add country to user array and return updated user
            $user['country'] = (string)$response->getBody();

            return $user;
        }
    );
});

// load a huge number of users to process from NDJSON file
$input = new Clue\React\NDJson\Decoder(
    new React\Stream\ReadableResourceStream(
        fopen(__DIR__ . '/users.ndjson', 'r')
    ),
    true
);

// process all users by piping through transformer
$input->pipe($transformer);

// log transformed output results
$transformer->on('data', function ($user) {
    echo $user['name'] . ' is from ' . $user['country'] . PHP_EOL;
});
$transformer->on('end', function () {
    echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});


// handle up to 10 jobs concurrently
$transformer = new Transformer(10, $handler);

// handle each job after another without concurrency (waterfall)
$transformer = new Transformer(1, $handler);

// using a Closure as handler is usually recommended
$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});

// accepts any callable, so PHP's array notation is also supported
$transformer = new Transformer(10, array($browser, 'get'));

$browser = new React\Http\Browser();

$promise = $browser->get($url);

$browser = new React\Http\Browser();

$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});

$transformer->write($url);

$transformer = new Transformer(10, function ($url) use ($browser) {
    $promise = $browser->get($url);

    return $promise->then(
        function ($response) {
            var_dump('Result received', $result);

            return json_decode($response->getBody());
        },
        function (Exception $e) {
            echo 'Error: ' . $e->getMessage() . PHP_EOL;

            throw $error;
        }
    );
);

use React\Promise\Timer;

$transformer = new Transformer(10, function ($uri) use ($browser) {
    return Timer\timeout($browser->get($uri), 2.0);
});

$transformer->write($uri);

$browser = new React\Http\Browser();

$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});

$transformer->on('data', function (ResponseInterface $response) {
    var_dump($response);
});

$transformer->write('http://example.com/');

$transformer->on('data', function (ResponseInterface $response) {
    var_dump($response);
});
$transformer->on('end', function () {
    echo '[DONE]' . PHP_EOL;
});

$transformer->end('http://example.com/');

$transformer->on('data', $this->expectCallableNever());
$transformer->on('close', function () {
    echo '[CLOSED]' . PHP_EOL;
});

$transformer->write('http://example.com/');
$transformer->close();

$source->pipe($transformer)->pipe($dest);

$transformer = new Transformer(10, function ($data) use ($http) {
    return $http->post('https://example.com/?id=' . $data['id'])->then(
        function ($response) use ($data) {
            return array('done' => $data['id']);
        }
    );
});

$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);

$transformer->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$uploader = new Transformer(10, function ($data) use ($http) {
    return $http->post('https://example.com/?id=' . $data['id'])->then(
        function ($response) use ($data) {
            return array('done' => $data['id']);
        },
        function ($error) use ($data) {
            // HTTP request failed => return dummy indicator
            return array(
                'failed' => $data['id'],
                'reason' => $error->getMessage()
            );
        }
    );
});

$browser = new React\Http\Browser();

$promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
    return $browser->post($url, [], json_encode($data));
});

$promise->then(function ($count) {
    echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$input = new ThroughStream();

$promise = Transformer::all($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();

// handle up to 10 jobs concurrently
$promise = Transformer::all($stream, 10, $handler);

// handle each job after another without concurrency (waterfall)
$promise = Transformer::all($stream, 1, $handler);

// using a Closure as handler is usually recommended
$promise = Transformer::all($stream, 10, function ($url) use ($browser) {
    return $browser->get($url);
});

// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::all($stream, 10, array($browser, 'get'));

$browser = new React\Http\Browser();

$promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
    return $browser->post($url, [], json_encode($data));
});

$promise->then(function (ResponseInterface $response) {
    echo 'First successful job: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$input = new ThroughStream();

$promise = Transformer::any($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();

// handle up to 10 jobs concurrently
$promise = Transformer::any($stream, 10, $handler);

// handle each job after another without concurrency (waterfall)
$promise = Transformer::any($stream, 1, $handler);

// using a Closure as handler is usually recommended
$promise = Transformer::any($stream, 10, function ($url) use ($browser) {
    return $browser->get($url);
});

// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::any($stream, 10, array($browser, 'get'));