Download the PHP package recisio/symfony-nats-messenger without Composer

On this page you can find all versions of the php package recisio/symfony-nats-messenger. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.

FAQ

After the download, you have to make one include require_once('vendor/autoload.php');. After that you have to import the classes with use statements.

Example:
If you use only one package a project is not needed. But if you use more then one package, without a project it is not possible to import the classes with use statements.

In general, it is recommended to use always a project to download your libraries. In an application normally there is more than one library needed.
Some PHP packages are not free to download and because of that hosted in private repositories. In this case some credentials are needed to access such packages. Please use the auth.json textarea to insert credentials, if a package is coming from a private repository. You can look here for more information.

  • Some hosting areas are not accessible by a terminal or SSH. Then it is not possible to use Composer.
  • To use Composer is sometimes complicated. Especially for beginners.
  • Composer needs much resources. Sometimes they are not available on a simple webspace.
  • If you are using private repositories you don't need to share your credentials. You can set up everything on our site and then you provide a simple download link to your team member.
  • Simplify your Composer build process. Use our own command line tool to download the vendor folder as binary. This makes your build process faster and you don't need to expose your credentials for private repositories.
Please rate this library. Is it a good library?

Informations about the package symfony-nats-messenger

Symfony NATS Messenger Bridge

PHP Version Symfony Version Unit Tests Coverage CI

A Symfony Messenger transport integration for NATS JetStream, enabling reliable asynchronous messaging with persistent message streaming.

This repository is a recisio-maintained fork intended to keep the transport usable on PHP 8.1 while remaining compatible with Symfony 6.4+.

Features

Requirements

System Requirements

Installation

Operational note: This is the package installation command, not a runtime behavior covered by the library test suite. The installed package is exercised by the unit and functional tests documented below.

Development Setup

For contributors and development:

Verification note: This is the repository verification workflow used for this package. The same command sequence is run when validating documentation and code changes in this repository.

Quick Start

1. Configure NATS Server

Ensure your NATS server has JetStream enabled:

Operational note: Starting nats-server -js is an environment prerequisite rather than package behavior. Automated coverage begins once JetStream is available and the transport is exercised by the functional scenarios below.

2. Set Up Transport in Symfony

Add the NATS transport to your Symfony Messenger configuration:

Tested by: testReadmeDsnExamplesParseSuccessfully[README: quick-start transport], testReadmeConfigurationOptionsAreAccepted, testReadmeBatchingExamplesAreAccepted, testReadmeTimeoutExamplesAreAccepted

3. Configure Custom Serializers (Optional)

By default, the transport uses igbinary serialization for high performance when the extension is available. If ext-igbinary is not installed, it falls back to Symfony's PhpSerializer and emits a notice. You can also customize this explicitly:

Using IgbinarySerializer (Default)

Note: Custom serializer services are resolved by Symfony before transport creation. When no serializer service is provided, the transport instantiates its built-in default serializer and falls back to Symfony's PhpSerializer if ext-igbinary is unavailable.

For example:

or:

Tested by: createTransport_UsesProvidedSerializer, serialize_WithValidEnvelope_ReturnsSerializedString, decode_WithValidEncodedEnvelope_ReturnsEnvelope, testConstructorWithoutIgbinaryDoesNotCrash

Creating Custom Serializers

You can create your own serializer by extending AbstractEnveloperSerializer:

Tested by: readmeCustomSerializerExample_EncodeDecode_RoundTrips, readmeCustomSerializerExample_DecodeInvalidBody_ThrowsException โ€” the exact code above is compiled and exercised via ReadmeExampleSerializer in the unit tests.

For reference implementations, see:

4. Send Messages

Tested by: testSendPublishesEncodedBodyWithoutHeaders, testSendUsesRequestWithHeadersWhenHeadersArePresent, Behat scenario Complete message flow - send, check stats, consume, verify

5. Handle Messages

Tested by: Behat scenarios Complete message flow - send, check stats, consume, verify, Send and consume messages with a custom consumer name, and High-volume message processing with file output verification โ€” handlers are exercised through real messenger:consume runs.

6. Consume Messages

Tested by: Behat scenarios Complete message flow - send, check stats, consume, verify, Send and consume messages with a custom consumer name, and Partial message consumption with multiple consumers โ€” the Behat context runs messenger:consume as a Symfony CLI process.

Configuration Guide

DSN Format

Tested by: testBuildWithValidDsnReturnsConfiguration, testBuildWithoutPathThrowsException, testBuildWithoutTopicThrowsException, createTransport_WithValidDsn_ReturnsNatsTransportInstance

Examples:

Tested by: testReadmeDsnExamplesParseSuccessfully โ€” each DSN above is parsed through the configuration builder via a dedicated data provider case.

Configuration Options

Tested by: testReadmeConfigurationOptionsAreAccepted (all options above), testReadmeBatchingExamplesAreAccepted, testReadmeTimeoutExamplesAreAccepted, testReadmeStreamRetentionExamplesAreAccepted, testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions

Retry Handler Behavior

Tested by: testRejectUsesTermByDefault, testRejectUsesNakWhenRetryHandlerIsNats, testBuildUsesRetryHandlerFromQuery, Behat scenarios nats_nak.feature and nats_term.feature

Important: Consumer Strategies

This is critical to understand before setting up multiple transport instances:

โš ๏ธ Strategy A: Same Consumer, Batching = 1

Use when: Multiple instances should cooperate on the same consumer

Why batching must be 1:

Benefits:

Tested by: testReadmeBatchingExamplesAreAccepted (batching=1), Behat scenario Partial message consumption with multiple consumers

โœ… Strategy B: Different Consumers, Any Batching

Use when: Each instance needs independent message processing (duplicates allowed)

Why this works:

Use cases:

Tested by: testReadmeBatchingExamplesAreAccepted (batching=10), Behat scenario Partial message consumption with multiple consumers

Batching & Timeouts

Batching Explained

Tested by: testReadmeBatchingExamplesAreAccepted โ€” values 1, 5, 10, 20, 50 are all verified.

Batch Timeout

Controls how long to wait for a batch to fill:

Tested by: testReadmeTimeoutExamplesAreAccepted โ€” values 0.5, 1.0, 2.0 are verified. Behat scenarios nats_batching.feature.

Example scenarios:

Connection Timeout

Controls the socket-level I/O timeout for all NATS operations:

Tested by: testReadmeTimeoutExamplesAreAccepted (1.0, 2.0, 3.0), testBuildWithConnectionTimeoutPropagatesMs

Purpose:

When to adjust:

Stream Configuration

Retention Policies

Control how long messages are kept in the stream:

Tested by: testReadmeStreamRetentionExamplesAreAccepted โ€” all retention options above are verified. Behat scenarios nats_stream_limits.feature.

Note: stream_max_messages limits the total number of messages stored in the stream (maps to NATS max_msgs), while stream_max_messages_per_subject limits messages retained per individual subject (maps to NATS max_msgs_per_subject). The per-subject limit is especially useful with multi-subject streams to prevent one high-volume subject from dominating retention.

High Availability

Tested by: testReadmeStreamRetentionExamplesAreAccepted (replicas 1 and 3), testSetupPassesConfiguredStreamOptions

Testing

Unit Tests

Verification note: This block documents the supported contributor workflow. The same composer test and composer test:unit commands are used to verify changes in this repository.

The target is to have at least 90% of code coverage.

What's tested:

Functional Tests

Functional tests require a running NATS server with JetStream enabled:

Verification note: This is the scripted functional test workflow used for the transport's end-to-end verification.

Manual approach:

Operational note: This manual Docker/Behat flow mirrors the scripted functional commands above and is not asserted separately by the package tests.

What's tested:

See also: tests/functional/README.md

Advanced Usage

Multiple Transports

Set up multiple independent transports for different use cases:

Tested by: testReadmeDsnExamplesParseSuccessfully[README: fast transport], testReadmeDsnExamplesParseSuccessfully[README: bulk transport], testReadmeDsnExamplesParseSuccessfully[README: audit transport], testReadmeAuditTransportOptionsAreAccepted, testReadmeBatchingExamplesAreAccepted

Multi-Subject Streams

Multiple transports can share the same NATS stream with different subjects. When messenger:setup-transports runs, each transport adds its subject to the existing stream rather than overwriting it:

The events stream will have both orders and payments as subjects.

Tested by: testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders], testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments], testReadmeMultiSubjectOptionsAreAccepted, testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenario Setup command merges subjects for transports sharing one stream

Note: When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.

Setup on Initialization

Automatically create streams and consumers on first run:

Then call setup command:

Tested by: testSetupCreatesStreamAndConsumer, testSetupPassesConfiguredStreamOptions, testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenarios Setup NATS stream with max age configuration, Setup command handles existing streams gracefully, and Custom consumer name is registered in JetStream

Delayed / Scheduled Messages

Requires NATS Server >= 2.12 with JetStream enabled.

Enable scheduled_messages in the DSN to use Symfony's DelayStamp for deferred delivery:

Then dispatch messages with a delay:

Tested by: testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders, testReadmeScheduledMessagesDsnEnablesFeature, Behat scenario Delayed messages are delivered after the scheduled time

When scheduled_messages is enabled and a DelayStamp is present:

When scheduled_messages is disabled (the default), any DelayStamp on the envelope is silently ignored and messages are published immediately.

This will:

  1. Create the stream with configured settings
  2. Create the consumer with explicit ACK policy
  3. Verify consumer creation

Stream Monitoring

View stream and consumer information:

Operational note: These are NATS CLI inspection commands, so this package does not assert their exact textual output directly. The underlying stream, consumer, and message-count state is covered by Behat scenarios Setup NATS stream with max age configuration, Custom consumer name is registered in JetStream, Complete message flow - send, check stats, consume, verify, and the getMessageCount() unit tests.

Manual Message Operations

Tested by: testGetMessageCountReturnsConsumerPendingMessages, testGetMessageCountFallsBackToStreamState, testGetMessageCountReturnsZeroWhenLookupsFail, testGetMessageCountReturnsAckPendingWhenHigherThanPending

Troubleshooting

Connection Issues

Error: "Connection refused"

Tested by: Behat scenario Setup command fails gracefully when NATS is unavailable

Error: "Stream not found"

Tested by: testSetupCreatesStreamAndConsumer, testSetupUpdatesStreamWhenItAlreadyExists, Behat scenarios Setup NATS stream with max age configuration and Setup command handles existing streams gracefully

Message Processing Issues

Messages not being consumed

Operational note: These are manual diagnosis commands. The actual consume path is covered by Behat scenarios Complete message flow - send, check stats, consume, verify, Send and consume messages with a custom consumer name, and Partial message consumption with multiple consumers.

Messages stuck in pending

Operational note: This checklist is manual guidance. Pending-count behavior is covered by testGetMessageCountReturnsConsumerPendingMessages, testGetMessageCountFallsBackToStreamState, testGetMessageCountReturnsAckPendingWhenHigherThanPending, and Behat scenario Complete message flow - send, check stats, consume, verify.

Architecture

The bridge consists of two main components:

NatsTransportFactory

NatsTransport

Performance Tips

  1. Choose appropriate batching

    • Start with batching: 5 for balanced performance
    • Increase to 20+ for high throughput workloads
    • Use 1 for strict low-latency requirements
  2. Set reasonable timeouts

    • max_batch_timeout: 0.5 for responsive systems
    • max_batch_timeout: 2.0 for background jobs
    • connection_timeout: 1.0 for local/regional deployments
    • connection_timeout: 3.0+ for cross-region or high-latency networks
  3. Use appropriate replicas

    • stream_replicas: 1 for development
    • stream_replicas: 3 for production
  4. Monitor performance
    • Use getMessageCount() to track queue depth
    • Monitor handler execution time
    • Watch for stuck messages

Security Considerations

โš ๏ธ Deserialization of Untrusted Data

The default IgbinarySerializer (and any serializer extending AbstractEnveloperSerializer) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a well-known attack vector โ€” a crafted payload can trigger arbitrary code execution via magic methods (__wakeup, __destruct, etc.).

If your NATS topics are not fully trusted (e.g. shared infrastructure, external publishers), you should:

The type check (instanceof Envelope) happens after deserialization, which is too late to prevent exploitation.

Stream-Exists Detection During Setup

During setup(), the transport prefers explicit NATS conflict messages (for example "already in use" or "already exists") to detect a pre-existing stream. When NATS returns a generic HTTP 400, the transport now verifies whether the stream actually exists before switching to updateStream(). This avoids misclassifying unrelated bad-request errors as an existing-stream conflict.

If you experience unexpected behavior during stream setup, review the exact error returned by your NATS server version and confirm the stream can be queried via JetStream stream info APIs.

Publish Response Validation

When JetStream publish acknowledgements are received through the header-aware request path, the transport parses the response as JSON and throws an exception if JetStream reports an error or the response is not valid JSON. This makes proxy or protocol misconfiguration fail closed instead of silently accepting an invalid publish acknowledgement.

General Recommendations

  1. Authentication

    • Prefer environment variables or explicit options for credentials over hard-coded DSNs
    • If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
    • Store credentials in environment variables
    • Never commit credentials to version control
  2. Message Encryption

    • Encrypt sensitive data before dispatching
    • NATS can be configured with TLS for transit encryption
    • Implement application-level encryption for sensitive payloads
  3. Access Control
    • Restrict stream/consumer creation to authorized users
    • Use NATS access control lists (ACLs) for fine-grained permissions
    • Audit stream operations

Contributing

Contributions are welcome! Please ensure:

Quick Development Workflow

Verification note: This is the exact end-to-end verification sequence required for repository changes and the same sequence used for this task.

License

MIT License - see LICENSE file for details

Support

For issues, questions, or suggestions:

  1. Check the troubleshooting section
  2. Check existing issues on GitHub
  3. Create a new issue with detailed information

๐Ÿ’– Love the project? Support it! ๐Ÿš€


All versions of symfony-nats-messenger with dependencies

PHP Build Version
Package Version
Requires php Version ^8.1
symfony/framework-bundle Version ^6.4 || ^7 || ^8
symfony/messenger Version ^6.4 || ^7 || ^8
recisio/php-nats-jetstream-client Version ^2.4
symfony/uid Version ^6.4 || ^7 || ^8
Composer command for our command line client (download client) This client runs in each environment. You don't need a specific PHP version etc. The first 20 API calls are free. Standard composer command

The package recisio/symfony-nats-messenger contains the following files

Loading the files please wait ...