PHP code example of react / stream
1. Go to this page and download the library: Download react/stream library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
react / stream example snippets
$stream->on('data', function (mixed $data): void {
echo $data;
});
$stream->on('end', function (): void {
echo 'END';
});
$server->on('error', function (Exception $e): void {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
$stream->on('close', function (): void {
echo 'CLOSED';
});
assert($stream->isReadable() === false);
$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());
$stream->pause();
$stream->on('data', assertShouldNeverCalled());
$stream->on('end', assertShouldNeverCalled());
$stream->pause();
Loop::addTimer(1.0, function () use ($stream): void {
$stream->resume();
});
$source->pipe($dest);
$connection->pipe($connection);
$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);
$source->pipe($dest, ['end' => false]);
$source->pipe($dest);
$source->on('close', function () use ($dest): void {
$dest->end('BYE!');
});
$source->close();
$source->pipe($dest); // NO-OP
$dest->close();
$source->pipe($dest); // calls $source->pause()
$source->pipe($dest);
$dest->close(); // calls $source->pause()
$stream->close();
$stream->close();
assert($stream->isReadable() === false);
$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());
$stream->on('drain', function () use ($stream): void {
echo 'Stream is now ready to accept more data';
});
$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void {
echo 'Now receiving piped data';
// explicitly close target if source emits an error
$source->on('error', function () use ($stream): void {
$stream->close();
});
});
$source->pipe($stream);
$stream->on('error', function (Exception $e): void {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
$stream->on('close', function (): void {
echo 'CLOSED';
});
assert($stream->isWritable() === false);
$stream->write('end'); // NO-OP
$stream->end('end'); // NO-OP
$stream->write('hello');
$stream->write('world');
$stream->end();
// shorter version
$stream->end('bye');
// same as longer version
$stream->write('bye');
$stream->end();
$stream->end();
assert($stream->isWritable() === false);
$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP
$stream->close();
$stream->close();
assert($stream->isWritable() === false);
$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP
$stream->end();
Loop::addTimer(1.0, function () use ($stream): void {
$stream->close();
});
$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function (string $chunk): void {
echo $chunk;
});
$stream->on('end', function (): void {
echo 'END';
});
// throws InvalidArgumentException
$stream = new ReadableResourceStream(false);
// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN);
$stream = new ReadableResourceStream(STDIN, null, 8192);
$stream = new WritableResourceStream(STDOUT);
$stream->write('hello!');
$stream->end();
// throws InvalidArgumentException
$stream = new WritableResourceStream(false);
// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT);
$stream = new WritableResourceStream(STDOUT, null, 8192);
$stream = new WritableResourceStream(STDOUT, null, null, 8192);
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn);
$stream->write('hello!');
$stream->end();
// throws InvalidArgumentException
$stream = new DuplexResourceStream(false);
// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT);
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, null, 8192);
$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, null, 8192);
$stream = new DuplexResourceStream($conn, null, null, $buffer);
$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));
$through->write('hello');
$through = new ThroughStream();
$source->pipe($through)->pipe($dest);
$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);
$through = new ThroughStream(function (mixed $data): string {
return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
$through->write([2, true]);
$through = new ThroughStream(function (mixed $data): string {
if (!is_string($data)) {
throw new \UnexpectedValueException('Only strings allowed');
}
return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));
$through->write(2);
$stdin = new ReadableResourceStream(STDIN);
$stdout = new WritableResourceStream(STDOUT);
$stdio = new CompositeStream($stdin, $stdout);
$stdio->on('data', function (string $chunk) use ($stdio): void {
$stdio->write('You said: ' . $chunk);
});
$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'));
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'));
$source->pipe($dest);