PHP code example of react / stream

1. Go to this page and download the library: Download react/stream library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

react / stream example snippets


$stream->on('data', function ($data) {
    echo $data;
});

$stream->on('end', function () {
    echo 'END';
});

$server->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () {
    echo 'CLOSED';
});

assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

$stream->pause();

$stream->on('data', assertShouldNeverCalled());
$stream->on('end', assertShouldNeverCalled());

$stream->pause();

Loop::addTimer(1.0, function () use ($stream) {
    $stream->resume();
});

$source->pipe($dest);

$connection->pipe($connection);

$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);

$source->pipe($dest, array('end' => false));

$source->pipe($dest);
$source->on('close', function () use ($dest) {
    $dest->end('BYE!');
});

$source->close();
$source->pipe($dest); // NO-OP

$dest->close();
$source->pipe($dest); // calls $source->pause()

$source->pipe($dest);
$dest->close(); // calls $source->pause()

$stream->close();

$stream->close();
assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

$stream->on('drain', function () use ($stream) {
    echo 'Stream is now ready to accept more data';
});

$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) {
    echo 'Now receiving piped data';

    // explicitly close target if source emits an error
    $source->on('error', function () use ($stream) {
        $stream->close();
    });
});

$source->pipe($stream);

$stream->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () {
    echo 'CLOSED';
});

assert($stream->isWritable() === false);

$stream->write('end'); // NO-OP
$stream->end('end'); // NO-OP

$stream->write('hello');
$stream->write('world');
$stream->end();

// shorter version
$stream->end('bye');

// same as longer version
$stream->write('bye');
$stream->end();

$stream->end();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

$stream->close();

$stream->close();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

$stream->end();
Loop::addTimer(1.0, function () use ($stream) {
    $stream->close();
});

$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function ($chunk) {
    echo $chunk;
});
$stream->on('end', function () {
    echo 'END';
});

// throws InvalidArgumentException
$stream = new ReadableResourceStream(false);

// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN);

$stream = new ReadableResourceStream(STDIN, null, 8192);

$stream = new WritableResourceStream(STDOUT);
$stream->write('hello!');
$stream->end();

// throws InvalidArgumentException
$stream = new WritableResourceStream(false);

// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT);

$stream = new WritableResourceStream(STDOUT, null, 8192);

$stream = new WritableResourceStream(STDOUT, null, null, 8192);

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn);
$stream->write('hello!');
$stream->end();

// throws InvalidArgumentException
$stream = new DuplexResourceStream(false);

// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT);

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, null, 8192);

$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, null, 8192);
$stream = new DuplexResourceStream($conn, null, null, $buffer);

$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));

$through->write('hello');

$through = new ThroughStream();
$source->pipe($through)->pipe($dest);

$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);

$through = new ThroughStream(function ($data) {
    return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write(array(2, true));

$through = new ThroughStream(function ($data) {
    if (!is_string($data)) {
        throw new \UnexpectedValueException('Only strings allowed');
    }
    return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);

$stdin = new ReadableResourceStream(STDIN);
$stdout = new WritableResourceStream(STDOUT);

$stdio = new CompositeStream($stdin, $stdout);

$stdio->on('data', function ($chunk) use ($stdio) {
    $stdio->write('You said: ' . $chunk);
});

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'));
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'));

$source->pipe($dest);