1. Go to this page and download the library: Download basis-company/nats library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
basis-company / nats example snippets
use Basis\Nats\Client;
use Basis\Nats\Configuration;
// you can override any default configuraiton key using constructor
$configuration = new Configuration(
host: 'nats-service',
user: 'basis',
pass: 'secret',
);
// delay configuration options are changed via setters
// default delay mode is constant - first retry be in 1ms, second in 1ms, third in 1ms
$configuration->setDelay(0.001);
// linear delay mode - first retry be in 1ms, second in 2ms, third in 3ms, fourth in 4ms, etc...
$configuration->setDelay(0.001, Configuration::DELAY_LINEAR);
// exponential delay mode - first retry be in 10ms, second in 100ms, third in 1s, fourth if 10 seconds, etc...
$configuration->setDelay(0.01, Configuration::DELAY_EXPONENTIAL);
$client = new Client($configuration);
$client->ping(); // true
use Basis\Nats\Client;
use Basis\Nats\Configuration;
// you can override any default configuraiton key using constructor
$configuration = new Configuration(
host: 'tls-service-endpoint',
tlsCertFile: "./certs/client-cert.pem",
tlsKeyFile': "./certs/client-key.pem",
tlsCaFile': "./certs/client-key.pem",
);
$configuration->setDelay(0.001);
$client = new Client($configuration);
$client->ping(); // true
use Basis\Nats\Client;
use Basis\Nats\Configuration;
use Basis\Nats\NKeys\CredentialsParser;
$configuration = new Configuration(
CredentialsParser::fromFile($credentialPath)
host: 'localhost',
port: 4222,
);
$client = new Client($configuration);
// queue usage example
$queue = $client->subscribe('test_subject');
$client->publish('test_subject', 'hello');
$client->publish('test_subject', 'world');
// optional message fetch
// if there are no updates null will be returned
$message1 = $queue->fetch();
echo $message1->payload . PHP_EOL; // hello
// locks until message is fetched from subject
// to limit lock timeout, pass optional timeout value
$message2 = $queue->next();
echo $message2->payload . PHP_EOL; // world
$client->publish('test_subject', 'hello');
$client->publish('test_subject', 'batching');
// batch message fetching, limit argument is optional
$messages = $queue->fetchAll(10);
echo count($messages);
// fetch all messages that are published to the subject client connection
// queue will stop message fetching when another subscription receives a message
// in advance you can time limit batch fetching
$queue->setTimeout(1); // limit to 1 second
$messages = $queue->fetchAll();
// reset subscription
$client->unsubscribe($queue);
// callback hell example
$client->subscribe('hello', function ($message) {
var_dump('got message', $message); // tester
});
$client->publish('hello', 'tester');
$client->process();
// if you want to append some headers, construct payload manually
use Basis\Nats\Message\Payload;
$payload = new Payload('tester', [
'Nats-Msg-Id' => 'payload-example'
]);
$client->publish('hello', $payload);
$client->subscribe('hello.request', function ($name) {
return "Hello, " . $name;
});
// async interaction
$client->request('hello.request', 'Nekufa1', function ($response) {
var_dump($response); // Hello, Nekufa1
});
$client->process(); // process request
// sync interaction (block until response get back)
$client->dispatch('hello.request', 'Nekufa2'); // Hello, Nekufa2
use Basis\Nats\Stream\RetentionPolicy;
use Basis\Nats\Stream\StorageBackend;
$accountInfo = $client->getApi()->getInfo(); // account_info_response object
$stream = $client->getApi()->getStream('mailer');
$stream->getConfiguration()
->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)
->setStorageBackend(StorageBackend::MEMORY)
->setSubjects(['mailer.greet', 'mailer.bye']);
// stream is created with given configuration
$stream->create();
// and put some tasks so workers would be doing something
$stream->put('mailer.greet', '[email protected]');
$stream->put('mailer.bye', '[email protected]');
var_dump($stream->info()); // can stream info
// this should be set in your worker
$greeter = $stream->getConsumer('greeter');
$greeter->getConfiguration()->setSubjectFilter('mailer.greet');
// consumer would be created would on first handle call
$greeter->handle(function ($address) {
mail($address, "Hi there!");
});
var_dump($greeter->info()); // can consumer info
$goodbyer = $stream->getConsumer('goodbyer');
$goodbyer->getConfiguration()->setSubjectFilter('mailer.bye');
$goodbyer->create(); // create consumer if you don't want to handle anything right now
$goodbyer->handle(function ($address) {
mail($address, "See you later");
});
// you can configure batching and iteration count using chain api
$goodbyer
->setBatching(2) // how many messages would be requested from nats stream
->setIterations(3) // how many times message request should be sent
->handle(function () {
// if you need to break on next iteration simply call interrupt method
// batch will be processed to the end and the handling would be stopped
// $goodbyer->interrupt();
});
// consumer can be used via queue interface
$queue = $goodbyer->getQueue();
while ($message = $queue->next()) {
if (rand(1, 10) % 2 == 0) {
mail($message->payload, "See you later");
$message->ack();
} else {
// not ack with 1 second timeout
$message->nack(1);
}
// stop processing
if (rand(1, 10) % 2 == 10) {
// don't forget to unsubscribe
$client->unsubscribe($queue);
break;
}
}
// use fetchAll method to batch process messages
// let's set batch size to 50
$queue = $goodbyer->setBatching(50)->create()->getQueue();
// fetching 100 messages provides 2 stream requests
// limit message fetching to 1 second
// it means no more that 100 messages would be fetched
$messages = $queue->setTimeout(1)->fetchAll(100);
$recipients = [];
foreach ($messages as $message) {
$recipients[] = (string) $message->payload;
}
mail_to_all($recipients, "See you later");
// ack all messages
foreach ($messages as $message) {
$message->ack();
}
// you also can create ephemeral consumer
// the only thing that ephemeral consumer is created as soon as object is created
// you have to create full consumer configuration first
use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;
$configuration = (new ConsumerConfiguration($stream->getName()))
->setDeliverPolicy(DeliverPolicy::NEW)
->setSubjectFilter('mailer.greet');
$ephemeralConsumer = $stream->createEphemeralConsumer($configuration);
// now you can use ephemeral consumer in the same way as durable consumer
$ephemeralConsumer->handle(function ($address) {
mail($address, "Hi there!");
});
// the only difference - you don't have to remove it manually, it will be deleted by NATS when socket connection is closed
// be aware that NATS will not remove that consumer immediately, process can take few seconds
var_dump(
$ephemeralConsumer->getName(),
$ephemeralConsumer->info(),
);
// if you need to append some headers, construct payload manually
use Basis\Nats\Message\Payload;
$payload = new Payload('[email protected]', [
'Nats-Msg-Id' => 'single-send'
]);
$stream->put('mailer.bye', $payload);
// Define a service
$service = $client->service(
'PostsService',
'This service is responsible for handling all things post related.',
'1.0'
);
// Create the version group
$version = $service->addGroup('v1');
// Create the index posts endpoint handler
class IndexPosts implements \Basis\Nats\Service\EndpointHandler {
public function handle(\Basis\Nats\Message\Payload $payload): array
{
// Your application logic
return [
'posts' => []
];
}
}
// Create the index endpoint
$version->addEndpoint("posts", IndexPosts::class);
// Create the service group
$posts = $version->addGroup('posts');
// View post endpoint
$posts->addEndpoint(
'*',
function (\Basis\Nats\Message\Payload $payload) {
$postId = explode('.', $payload->subject);
$postId = $postId[count($postId)-1];
return [
'post' => []
];
}
);
// Run the service
$service->run();
$bucket = $client->getApi()->getBucket('bucket_name');
// basics
$bucket->put('username', 'nekufa');
echo $bucket->get('username'); // nekufa
// safe update (given revision)
$entry = $bucket->getEntry('username');
echo $entry->value; // nekufa
$bucket->update('username', 'bazyaba', $entry->revision);
// delete value
$bucket->delete('username');
// purge value history
$bucket->purge('username');
// get bucket stats
var_dump($bucket->getStatus());
// in advance, you can fetch all bucket values
$bucket->update('email', '[email protected]');
var_dump($bucket->getAll()); // ['email' => '[email protected]', 'username' => 'nekufa']
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.