PHP code example of domraider / rxnet

1. Go to this page and download the library: Download domraider/rxnet library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

domraider / rxnet example snippets


$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4'));

// All types of queries are allowed
$dns->soa('www.google.fr')
  ->subscribe(new StdoutObserver());

$http = new Http();
$http->get("https://github.com/Domraider/rxnet/commits/master")
  // Timeout after 0.3s
  ->timeout(300)
  // will retry 2 times on error 
  ->retry(2)
  // Transform response to something else
  ->map(function(Psr\Http\Message\ResponseInterface $response) {
  	$body = (string) $response->getBody();
	// Domcrawler to extract commits
    return $body;
  })
  ->subscribe(new StdoutObserver());

// All the given options
$opts = [
  // No buffering, you will receive chunks has they arrived
  // Perfect for big files to parse or streaming json
  'stream' => true,
  // You can use body, json or form_params
  // * json will add the header and json_encode
  // * form_params will build query in body and add application/x-www-form-urlencoded header
  'body' => 'raw body for post',
  'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
  'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'],
  // Set query string from here not the url
  'query'=> [
    'param1'=>'one'
  ],
  // Use a proxy
  'proxy' => 'http://user:password@myproxy:8080',
  // Append extra headers
  'headers' => [
    'Authorization' => 'Basic '.base64_encode('user:password'),
    // Specify user-agent to use
    'User-Agent' => 'SuperParser/0.1',
  ], 
  // see http://php.net/manual/en/context.ssl.php
  // Add whatever option you want on your https query
  'ssl' => [
    'verify_peer' => false
  ],
  // allow redirect
  'allow_redirects' => true,
  // or
  'allow_redirects' => [
    // max redirects to follow
    'max' => 10
  ]
];

$http->post('https://adwords.google.com/my-10gb.xml', $opts)
  ->subscribeCallback(function($chunk) {
    // let's give it to expat while it arrives
  });

$httpd = new HttpD();
$httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) {
  $response->text('Hello');
});
$httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) {
  $var = $request->getRouteParam('var');
  $response->json(['var'=>$var]);
});
$httpd->listen(8080);

$rabbit = new RabbitMq('rabbit://guest:[email protected]:5672/', new Serialize());
// Wait for rabbit to be connected
\Rxnet\awaitOnce($rabbit->connect());

// Will wait for messages
$rabbit->consume()
    ->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
        echo '.';
        $data = $message->getData();
        $name = $message->getName();
        $head = $message->getLabels();
        // Do what you want but do one of this to get next
        $message->ack();
        //$message->nack();
        //$message->reject();
        //$message->rejectToBottom();
    });

$queue = $rabbit->queue('test_queue', []);
$exchange = $rabbit->exchange('amq.direct');

$rabbit->connect()
    ->zip([
      // Declare all the binding
        $queue->create($queue::DURABLE),
        $queue->bind('/routing/key', 'amq.direct'),
        $exchange->create($exchange::TYPE_DIRECT, [
            $exchange::DURABLE,
            $exchange::AUTO_DELETE
        ])
    ])
    // Everything's done let's produce
    ->subscribeCallback(function () use ($exchange, $loop) {
        $done = 0;
		// Just a simple array
        \Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
            // Wait for one produce to be done before starting another
            ->flatMap(function ($data) use ($exchange) {
                // Rabbit will handle serialize and unserialize
                return $exchange->produce($data, '/routing/key');
            })
            // Produce it many times
            ->repeat($10000)
            // Let's get some stats
            ->subscribeCallback(
                function () use (&$done) { $done++;}, 
                function (\Exception $e) { echo "shit happens : ".$e->getMessage();}, 
                function () use (&$done, $loop) { echo number_format($done)." lines produced"; }
        	);
    });

$redis = new Redis();

// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379'));

$redis->get('key')
  ->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented

$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())	
// And start
$dealer->send(new PingCommand('ping'));

// Bind and wait
$router = $zmq->router('tcp://127.0.0.1:3000');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
  ->delay(100)
  ->subscribeCallback(function($data) use($router) {
  	$router->send(new ReceivedEvent('pong'), 'Roger');
  });


$statsd->gauge("database.connections", 42)
  ->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));



$conn = new Rxnte\Mysql\Connection([
    'host' => 'localhost',
    'user' => 'myUser',
    'password' => 'myPasswd',
    'database' => 'myDb'
]);

$conn->query('SELECT NOW()');

$conn->transaction(['SELECT NOW()']);

$observable = $http->get('http://www.google.fr')
  ->timeout(1000)
  ->retry(3);
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";

$source = \Rx\Observable::interval(1000)
    ->take(5); //Limit items to 5

$generator = \Rx\await($source);

foreach ($generator as $item) {
    echo $item, PHP_EOL;
}
echo "DONE";

// Great to read gigabytes without memory leaks
$reader = new \Rxnet\OnDemand\OnDemandFileReader("./test.csv");
$reader->getObservable()
    ->subscribeCallback(
        function ($row) use ($reader) {
            echo "got row : {$row}\n";
            // read next octet
            $reader->produceNext();
        },
        null,
        function() {
            echo "------------------\n";
            echo "read completed\n";
        }
    );
$reader->produceNext(1);

$backPressure = new \Rxnet\Operator\OnBackPressureBuffer(
    5, // Buffer capacity 
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);

$backPressure = new \Rxnet\Operator\OnBackPressureBufferFile(
    './', // Folder to write files
    new MsgPack(), // Serializer to use
    -1, // Buffer capacity, -1 for unlimited
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);