Download the PHP package recisio/symfony-nats-messenger without Composer
On this page you can find all versions of the php package recisio/symfony-nats-messenger. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download recisio/symfony-nats-messenger
More information about recisio/symfony-nats-messenger
Files in recisio/symfony-nats-messenger
Package symfony-nats-messenger
Short Description Symfony NATS (JetStream) Messenger Bridge with PHP 8.1 compatibility
License MIT
Informations about the package symfony-nats-messenger
Symfony NATS Messenger Bridge
A Symfony Messenger transport integration for NATS JetStream, enabling reliable asynchronous messaging with persistent message streaming.
This repository is a recisio-maintained fork intended to keep the transport usable on PHP 8.1 while remaining compatible with Symfony 6.4+.
Features
- ๐ High-Performance Messaging - Leverage NATS JetStream for fast, reliable message delivery
- ๐ฆ Symfony Integration - Seamless integration with Symfony Messenger
- โ๏ธ Configurable Consumers - Support for multiple consumer strategies
- ๐ Flexible Batching - Adjustable message batch sizes and timeouts
- ๐ Authentication Support - Built-in support for NATS authentication
- ๐ Stream Configuration - Configurable retention policies and replication
- ๐งช Thoroughly Tested - 102 unit tests with ~96% code coverage
Requirements
System Requirements
- PHP: ^8.1
- Symfony: ^6.4 || ^7 || ^8
- NATS Server: ^2.9 with JetStream enabled, ^2.12 for scheduled messages support.
Installation
Operational note: This is the package installation command, not a runtime behavior covered by the library test suite. The installed package is exercised by the unit and functional tests documented below.
Development Setup
For contributors and development:
Verification note: This is the repository verification workflow used for this package. The same command sequence is run when validating documentation and code changes in this repository.
Quick Start
1. Configure NATS Server
Ensure your NATS server has JetStream enabled:
Operational note: Starting
nats-server -jsis an environment prerequisite rather than package behavior. Automated coverage begins once JetStream is available and the transport is exercised by the functional scenarios below.
2. Set Up Transport in Symfony
Add the NATS transport to your Symfony Messenger configuration:
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: quick-start transport],testReadmeConfigurationOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted
3. Configure Custom Serializers (Optional)
By default, the transport uses igbinary serialization for high performance when the extension is available. If ext-igbinary is not installed, it falls back to Symfony's PhpSerializer and emits a notice. You can also customize this explicitly:
Using IgbinarySerializer (Default)
Note: Custom serializer services are resolved by Symfony before transport creation. When no serializer service is provided, the transport instantiates its built-in default serializer and falls back to Symfony's PhpSerializer if ext-igbinary is unavailable.
For example:
or:
Tested by:
createTransport_UsesProvidedSerializer,serialize_WithValidEnvelope_ReturnsSerializedString,decode_WithValidEncodedEnvelope_ReturnsEnvelope,testConstructorWithoutIgbinaryDoesNotCrash
Creating Custom Serializers
You can create your own serializer by extending AbstractEnveloperSerializer:
Tested by:
readmeCustomSerializerExample_EncodeDecode_RoundTrips,readmeCustomSerializerExample_DecodeInvalidBody_ThrowsExceptionโ the exact code above is compiled and exercised viaReadmeExampleSerializerin the unit tests.
For reference implementations, see:
src/Serializer/IgbinarySerializer.php- Binary serializationsrc/Serializer/AbstractEnveloperSerializer.php- Base class
4. Send Messages
Tested by:
testSendPublishesEncodedBodyWithoutHeaders,testSendUsesRequestWithHeadersWhenHeadersArePresent, Behat scenarioComplete message flow - send, check stats, consume, verify
5. Handle Messages
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andHigh-volume message processing with file output verificationโ handlers are exercised through realmessenger:consumeruns.
6. Consume Messages
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andPartial message consumption with multiple consumersโ the Behat context runsmessenger:consumeas a Symfony CLI process.
Configuration Guide
DSN Format
Tested by:
testBuildWithValidDsnReturnsConfiguration,testBuildWithoutPathThrowsException,testBuildWithoutTopicThrowsException,createTransport_WithValidDsn_ReturnsNatsTransportInstance
Examples:
Tested by:
testReadmeDsnExamplesParseSuccessfullyโ each DSN above is parsed through the configuration builder via a dedicated data provider case.
Configuration Options
Tested by:
testReadmeConfigurationOptionsAreAccepted(all options above),testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted,testReadmeStreamRetentionExamplesAreAccepted,testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions
Retry Handler Behavior
retry_handler: symfony(default) sendsTERMwhen a message fails during transport decoding or is rejected.retry_handler: natssendsNAKwhen a message fails during transport decoding or is rejected.
Tested by:
testRejectUsesTermByDefault,testRejectUsesNakWhenRetryHandlerIsNats,testBuildUsesRetryHandlerFromQuery, Behat scenariosnats_nak.featureandnats_term.feature
Important: Consumer Strategies
This is critical to understand before setting up multiple transport instances:
โ ๏ธ Strategy A: Same Consumer, Batching = 1
Use when: Multiple instances should cooperate on the same consumer
Why batching must be 1:
- With explicit acknowledge (ACK) mode, only messages that are explicitly acknowledged are considered processed
- Multiple instances sharing the same consumer need to ACK individually
- Batching > 1 with multiple instances causes delivery conflicts
- Each instance should fetch and ACK one message at a time
Benefits:
- Automatic load balancing across instances
- NATS handles message distribution
- Guaranteed single processing per message
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=1), Behat scenarioPartial message consumption with multiple consumers
โ Strategy B: Different Consumers, Any Batching
Use when: Each instance needs independent message processing (duplicates allowed)
Why this works:
- Each consumer maintains its own state
- All messages are delivered to all consumers independently
- Each instance can use higher batching for better throughput
- Duplicate processing is expected (fan-out pattern)
Use cases:
- Event broadcasting to multiple systems
- Multiple independent processors
- Audit logging / event replay
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=10), Behat scenarioPartial message consumption with multiple consumers
Batching & Timeouts
Batching Explained
- Higher batching: Better throughput, slightly higher latency
- Lower batching: Lower latency, slightly reduced throughput
- Optimal batching: Depends on message size and processing time
Tested by:
testReadmeBatchingExamplesAreAcceptedโ values 1, 5, 10, 20, 50 are all verified.
Batch Timeout
Controls how long to wait for a batch to fill:
Tested by:
testReadmeTimeoutExamplesAreAcceptedโ values 0.5, 1.0, 2.0 are verified. Behat scenariosnats_batching.feature.
Example scenarios:
- If you set
batching: 10andmax_batch_timeout: 0.5 - If 10 messages arrive quickly, all are fetched immediately
- If only 3 messages arrive in 0.5s, return those 3
Connection Timeout
Controls the socket-level I/O timeout for all NATS operations:
Tested by:
testReadmeTimeoutExamplesAreAccepted(1.0, 2.0, 3.0),testBuildWithConnectionTimeoutPropagatesMs
Purpose:
- Sets the timeout for socket read/write operations
- Affects all NATS communication (publish, subscribe, ack, etc.)
- Lower values fail faster on network issues
- Higher values tolerate slower networks
When to adjust:
- Increase for high-latency networks or geographically distant NATS servers
- Decrease for faster failure detection in local environments
- Default of 1 second works well for most local/regional deployments
- Don't wait forever for the batch to fill
Stream Configuration
Retention Policies
Control how long messages are kept in the stream:
Tested by:
testReadmeStreamRetentionExamplesAreAcceptedโ all retention options above are verified. Behat scenariosnats_stream_limits.feature.Note:
stream_max_messageslimits the total number of messages stored in the stream (maps to NATSmax_msgs), whilestream_max_messages_per_subjectlimits messages retained per individual subject (maps to NATSmax_msgs_per_subject). The per-subject limit is especially useful with multi-subject streams to prevent one high-volume subject from dominating retention.
High Availability
Tested by:
testReadmeStreamRetentionExamplesAreAccepted(replicas 1 and 3),testSetupPassesConfiguredStreamOptions
Testing
Unit Tests
Verification note: This block documents the supported contributor workflow. The same
composer testandcomposer test:unitcommands are used to verify changes in this repository.
The target is to have at least 90% of code coverage.
What's tested:
- DSN parsing and validation
- Configuration option handling
- Authentication support
- Port configuration
- Error handling
- Interface compliance
Functional Tests
Functional tests require a running NATS server with JetStream enabled:
Verification note: This is the scripted functional test workflow used for the transport's end-to-end verification.
Manual approach:
Operational note: This manual Docker/Behat flow mirrors the scripted functional commands above and is not asserted separately by the package tests.
What's tested:
- Message publishing
- Message consumption
- Message acknowledgment
- Consumer setup
- Stream persistence
See also: tests/functional/README.md
Advanced Usage
Multiple Transports
Set up multiple independent transports for different use cases:
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: fast transport],testReadmeDsnExamplesParseSuccessfully[README: bulk transport],testReadmeDsnExamplesParseSuccessfully[README: audit transport],testReadmeAuditTransportOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted
Multi-Subject Streams
Multiple transports can share the same NATS stream with different subjects. When messenger:setup-transports runs, each transport adds its subject to the existing stream rather than overwriting it:
The events stream will have both orders and payments as subjects.
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders],testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments],testReadmeMultiSubjectOptionsAreAccepted,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenarioSetup command merges subjects for transports sharing one streamNote: When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.
Setup on Initialization
Automatically create streams and consumers on first run:
Then call setup command:
Tested by:
testSetupCreatesStreamAndConsumer,testSetupPassesConfiguredStreamOptions,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenariosSetup NATS stream with max age configuration,Setup command handles existing streams gracefully, andCustom consumer name is registered in JetStream
Delayed / Scheduled Messages
Requires NATS Server >= 2.12 with JetStream enabled.
Enable scheduled_messages in the DSN to use Symfony's DelayStamp for deferred delivery:
Then dispatch messages with a delay:
Tested by:
testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders,testReadmeScheduledMessagesDsnEnablesFeature, Behat scenarioDelayed messages are delivered after the scheduled time
When scheduled_messages is enabled and a DelayStamp is present:
- The message is published to a
{topic}.delayed.{uuid}subject withNats-ScheduleandNats-Schedule-Targetheaders - The stream is created with an additional
{topic}.delayed.>subject andallow_msg_schedulesenabled - NATS holds the message and delivers it to the original topic at the scheduled time
- The consumer processes it like any other message
When scheduled_messages is disabled (the default), any DelayStamp on the envelope is silently ignored and messages are published immediately.
This will:
- Create the stream with configured settings
- Create the consumer with explicit ACK policy
- Verify consumer creation
Stream Monitoring
View stream and consumer information:
Operational note: These are NATS CLI inspection commands, so this package does not assert their exact textual output directly. The underlying stream, consumer, and message-count state is covered by Behat scenarios
Setup NATS stream with max age configuration,Custom consumer name is registered in JetStream,Complete message flow - send, check stats, consume, verify, and thegetMessageCount()unit tests.
Manual Message Operations
Tested by:
testGetMessageCountReturnsConsumerPendingMessages,testGetMessageCountFallsBackToStreamState,testGetMessageCountReturnsZeroWhenLookupsFail,testGetMessageCountReturnsAckPendingWhenHigherThanPending
Troubleshooting
Connection Issues
Error: "Connection refused"
Tested by: Behat scenario
Setup command fails gracefully when NATS is unavailable
Error: "Stream not found"
Tested by:
testSetupCreatesStreamAndConsumer,testSetupUpdatesStreamWhenItAlreadyExists, Behat scenariosSetup NATS stream with max age configurationandSetup command handles existing streams gracefully
Message Processing Issues
Messages not being consumed
Operational note: These are manual diagnosis commands. The actual consume path is covered by Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andPartial message consumption with multiple consumers.
Messages stuck in pending
Operational note: This checklist is manual guidance. Pending-count behavior is covered by
testGetMessageCountReturnsConsumerPendingMessages,testGetMessageCountFallsBackToStreamState,testGetMessageCountReturnsAckPendingWhenHigherThanPending, and Behat scenarioComplete message flow - send, check stats, consume, verify.
Architecture
The bridge consists of two main components:
NatsTransportFactory
- Handles DSN scheme detection (
nats-jetstream://) - Creates
NatsTransportinstances - Validates configuration
NatsTransport
- Implements Symfony's
TransportInterface - Manages stream and consumer connections
- Handles message serialization (igbinary)
- Supports batching and explicit acknowledgment
Performance Tips
-
Choose appropriate batching
- Start with
batching: 5for balanced performance - Increase to 20+ for high throughput workloads
- Use 1 for strict low-latency requirements
- Start with
-
Set reasonable timeouts
max_batch_timeout: 0.5for responsive systemsmax_batch_timeout: 2.0for background jobsconnection_timeout: 1.0for local/regional deploymentsconnection_timeout: 3.0+for cross-region or high-latency networks
-
Use appropriate replicas
stream_replicas: 1for developmentstream_replicas: 3for production
- Monitor performance
- Use
getMessageCount()to track queue depth - Monitor handler execution time
- Watch for stuck messages
- Use
Security Considerations
โ ๏ธ Deserialization of Untrusted Data
The default IgbinarySerializer (and any serializer extending AbstractEnveloperSerializer) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a well-known attack vector โ a crafted payload can trigger arbitrary code execution via magic methods (__wakeup, __destruct, etc.).
If your NATS topics are not fully trusted (e.g. shared infrastructure, external publishers), you should:
- Implement a custom serializer that uses a safe format (JSON, Protobuf) instead of PHP object serialization
- Add message-level authentication (e.g. HMAC signatures) to verify publisher identity before deserializing
- Restrict NATS topic publish permissions via ACLs so only trusted services can publish
The type check (instanceof Envelope) happens after deserialization, which is too late to prevent exploitation.
Stream-Exists Detection During Setup
During setup(), the transport prefers explicit NATS conflict messages (for example "already in use" or "already exists") to detect a pre-existing stream. When NATS returns a generic HTTP 400, the transport now verifies whether the stream actually exists before switching to updateStream(). This avoids misclassifying unrelated bad-request errors as an existing-stream conflict.
If you experience unexpected behavior during stream setup, review the exact error returned by your NATS server version and confirm the stream can be queried via JetStream stream info APIs.
Publish Response Validation
When JetStream publish acknowledgements are received through the header-aware request path, the transport parses the response as JSON and throws an exception if JetStream reports an error or the response is not valid JSON. This makes proxy or protocol misconfiguration fail closed instead of silently accepting an invalid publish acknowledgement.
General Recommendations
-
Authentication
- Prefer environment variables or explicit options for credentials over hard-coded DSNs
- If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
- Store credentials in environment variables
- Never commit credentials to version control
-
Message Encryption
- Encrypt sensitive data before dispatching
- NATS can be configured with TLS for transit encryption
- Implement application-level encryption for sensitive payloads
- Access Control
- Restrict stream/consumer creation to authorized users
- Use NATS access control lists (ACLs) for fine-grained permissions
- Audit stream operations
Contributing
Contributions are welcome! Please ensure:
- Every modification runs the relevant verification commands before it is considered done
- Minimum verification for PHP changes:
composer test - All tests pass:
composer test:unit - Code coverage remains above 90%
- New features include corresponding tests
- Documentation is updated
- Functional tests pass:
composer test:functional(if applicable) docs/TESTS.mdis kept up to date when tests are added, removed, or renamed- Each release has an entry in
docs/CHANGELOG.mdfollowing Keep a Changelog format - When a PR is merged or its features are adapted, a description is added to
docs/PRs/
Quick Development Workflow
Verification note: This is the exact end-to-end verification sequence required for repository changes and the same sequence used for this task.
License
MIT License - see LICENSE file for details
Support
For issues, questions, or suggestions:
- Check the troubleshooting section
- Check existing issues on GitHub
- Create a new issue with detailed information
๐ Love the project? Support it! ๐
- ๐ช BTC: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
- ๐ ETH: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
- โก TRX: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
- ๐ LTC: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB
- โ Buy me a coffee: https://buymeacoffee.com/idct
- ๐ Sponsor: https://github.com/sponsors/ideaconnect
All versions of symfony-nats-messenger with dependencies
symfony/framework-bundle Version ^6.4 || ^7 || ^8
symfony/messenger Version ^6.4 || ^7 || ^8
recisio/php-nats-jetstream-client Version ^2.4
symfony/uid Version ^6.4 || ^7 || ^8