1. Go to this page and download the library: Download b2pweb/bdf-queue library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
b2pweb / bdf-queue example snippets
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection;
use Bdf\Queue\Destination\ConfigurationDestinationFactory;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Destination\DestinationFactory;
use Bdf\Queue\Serializer\JsonSerializer;
// Declare connections
$driverFactory = new ResolverConnectionDriverFactory([
'foo' => [
'driver' => 'pheanstalk',
'host' => 'localhost',
'port' => '11300',
'additionalOption' => 'value',
]
// OR use DSN 'foo' => 'pheanstalk://localhost:11300?additionalOption=value'
]);
// Declare drivers
$driverFactory->addDriverResolver('pheanstalk', function($config) {
//echo $config['connection'] displays "foo"
return new PheanstalkConnection($config['connection'], new JsonSerializer());
});
// Declare destination
// You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...),
// the connection to use, and the name of the queue(s) / topic(s) to use.
// This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default".
$destinationFactory = new DestinationFactory(
$driverFactory,
['my_destination' => 'queue://foo/default']
);
// To send a message to multiple destinations, you can use "aggregate" destination type.
// You can use a wildcard to send to all destinations that match the pattern.
// In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user"
$destinationFactory = new DestinationFactory(
$driverFactory,
[
'foo' => 'queue://test/foo',
'bar' => 'queue://test/bar',
'a.user' => 'topic://a/user',
'b.user' => 'topic://b/user',
'user' => 'aggregate://foo,bar,*.user',
]
);
// Create the manager
$manager = new DestinationManager($driverFactory, $destinationFactory);
use Bdf\Queue\Message\Message;
$message = Message::create('Hello world');
$message->setDestination('my_destination');
// or use a lower level setting the connection and queue.
$message = Message::create('Hello world', 'queue');
$message->setConnection('foo');
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);
use Bdf\Dsn\DsnRequest;
use Bdf\Queue\Connection\ConnectionDriverInterface;
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
/** @var ResolverConnectionDriverFactory $driverFactory */
$destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory);
$destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) {
// ...
});
// use dsn as "my_own_type://connection/queue_or_topic_name?option="
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\CallbackProcessor;
use Bdf\Queue\Processor\MapProcessorResolver;
// Create your processor and declare in a map:
$myProcessor = new CallbackProcessor(function($data) {
echo $data;
});
$processorResolver = new MapProcessorResolver(['foo' => $myProcessor]);
/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
use Bdf\Instantiator\Instantiator;
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\JobHintProcessorResolver;
/** @var Instantiator $instantiator */
// The job should be provided from message to get the processor
$processorResolver = new JobHintProcessorResolver($instantiator);
/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
/** @var Bdf\Queue\Destination\DestinationManager $manager */
class MyHandler
{
public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope)
{
echo $data; // Display 'foo'
// Ack the message. Default behavior. The ack is sent before the call by the consumer.
$envelope->acknowledge();
// Reject the message. It will be no more available. The message is rejected if and exception is thrown.
$envelope->reject();
// Reject the message and send it back to the queue
$envelope->reject(true);
}
}
$message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue');
$manager->send($message);
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface;
use Psr\Container\ContainerInterface;
/** @var ContainerInterface $container */
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) {
return new ReceiverLoader(
$container,
[
'destination_name or connection_name' => function(ReceiverBuilder $builder) {
/** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */
/** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */
// Register your unique handler for the destination or connection.
// all message will be handled by this handler.
$builder->handler(MyHandler::class);
// Or register your unique processor
$builder->processor($myProcessor);
// Or register the job bearer resolver as processor. The procesor will resolve the job
// from the Message::$job attribute value.
$builder->jobProcessor();
// Or register your own processor or handler by queue in case you consume a connection.
// By default the key of the map is the queue name. You can provide your own key provider
// with the second parameter.
$builder->mapProcessor([
'queue1' => $myProcessor,
'queue2' => MyHandler::class,
]);
// Or register your final own receiver
$builder->outlet($myReceiver);
// Or register your own receiver in the stack
$builder->add($myReceiver);
// You can add more defined middlewares here
// $builder->retry(2);
}
]
);
});
$receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build();
$manager->create('queue://foo')->consumer($receiver)->consume(0);
class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface
{
use \Bdf\Queue\Consumer\DelegateHelper;
private $options;
/**
* MyExtension constructor.
*/
public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options)
{
$this->delegate = $delegate;
$this->options = $options;
}
/**
* {@inheritdoc}
*/
public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void
{
// Do something when receiving message
if ($message->queue() === 'foo') {
return;
}
// Call the next receiver
$this->delegate->receive($message, $consumer);
}
}
use Bdf\Queue\Message\InteractEnvelopeInterface;
use Bdf\Queue\Message\Message;
class RpcReplyHandler
{
public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope)
{
// Send bask: 1 x 2 to client
$envelope->reply($number * 2);
// Or retry in 10sec
$envelope->retry(10);
}
}
$message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue');
$message->setConnection('foo');
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$promise = $manager->send($message);
// Consume the foo connection
// Receive data from the reply queue. If the header "replyTo" is not set,
// the response will be sent to "queue_reply"
echo $promise->await(500)->data(); // Display 2
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.