1. Go to this page and download the library: Download domraider/rxnet library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
domraider / rxnet example snippets
$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4'));
// All types of queries are allowed
$dns->soa('www.google.fr')
->subscribe(new StdoutObserver());
$http = new Http();
$http->get("https://github.com/Domraider/rxnet/commits/master")
// Timeout after 0.3s
->timeout(300)
// will retry 2 times on error
->retry(2)
// Transform response to something else
->map(function(Psr\Http\Message\ResponseInterface $response) {
$body = (string) $response->getBody();
// Domcrawler to extract commits
return $body;
})
->subscribe(new StdoutObserver());
// All the given options
$opts = [
// No buffering, you will receive chunks has they arrived
// Perfect for big files to parse or streaming json
'stream' => true,
// You can use body, json or form_params
// * json will add the header and json_encode
// * form_params will build query in body and add application/x-www-form-urlencoded header
'body' => 'raw body for post',
'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'],
// Set query string from here not the url
'query'=> [
'param1'=>'one'
],
// Use a proxy
'proxy' => 'http://user:password@myproxy:8080',
// Append extra headers
'headers' => [
'Authorization' => 'Basic '.base64_encode('user:password'),
// Specify user-agent to use
'User-Agent' => 'SuperParser/0.1',
],
// see http://php.net/manual/en/context.ssl.php
// Add whatever option you want on your https query
'ssl' => [
'verify_peer' => false
],
// allow redirect
'allow_redirects' => true,
// or
'allow_redirects' => [
// max redirects to follow
'max' => 10
]
];
$http->post('https://adwords.google.com/my-10gb.xml', $opts)
->subscribeCallback(function($chunk) {
// let's give it to expat while it arrives
});
$rabbit = new RabbitMq('rabbit://guest:[email protected]:5672/', new Serialize());
// Wait for rabbit to be connected
\Rxnet\awaitOnce($rabbit->connect());
// Will wait for messages
$rabbit->consume()
->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
echo '.';
$data = $message->getData();
$name = $message->getName();
$head = $message->getLabels();
// Do what you want but do one of this to get next
$message->ack();
//$message->nack();
//$message->reject();
//$message->rejectToBottom();
});
$queue = $rabbit->queue('test_queue', []);
$exchange = $rabbit->exchange('amq.direct');
$rabbit->connect()
->zip([
// Declare all the binding
$queue->create($queue::DURABLE),
$queue->bind('/routing/key', 'amq.direct'),
$exchange->create($exchange::TYPE_DIRECT, [
$exchange::DURABLE,
$exchange::AUTO_DELETE
])
])
// Everything's done let's produce
->subscribeCallback(function () use ($exchange, $loop) {
$done = 0;
// Just a simple array
\Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
// Wait for one produce to be done before starting another
->flatMap(function ($data) use ($exchange) {
// Rabbit will handle serialize and unserialize
return $exchange->produce($data, '/routing/key');
})
// Produce it many times
->repeat($10000)
// Let's get some stats
->subscribeCallback(
function () use (&$done) { $done++;},
function (\Exception $e) { echo "shit happens : ".$e->getMessage();},
function () use (&$done, $loop) { echo number_format($done)." lines produced"; }
);
});
$redis = new Redis();
// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379'));
$redis->get('key')
->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented
$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())
// And start
$dealer->send(new PingCommand('ping'));
// Bind and wait
$router = $zmq->router('tcp://127.0.0.1:3000');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
->delay(100)
->subscribeCallback(function($data) use($router) {
$router->send(new ReceivedEvent('pong'), 'Roger');
});
$statsd->gauge("database.connections", 42)
->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));
$observable = $http->get('http://www.google.fr')
->timeout(1000)
->retry(3);
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";