Download the PHP package illusiard/yii2-rabbitmq without Composer
On this page you can find all versions of the php package illusiard/yii2-rabbitmq. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Informations about the package yii2-rabbitmq
illusiard/yii2-rabbitmq
Расширение Yii2 RabbitMQ с отложенным соединением, подтверждением публикации и управляемыми handlers c ретраем.
Quickstart
Требования:
- PHP >= 8.1
- Yii2 >= 2.0.55
- RabbitMQ-compatible AMQP broker
Установка:
Пакет имеет тип yii2-extension и регистрирует bootstrap-класс через yii2-composer.
В стандартном Yii2-приложении после установки автоматически доступны:
- компонент
rabbitmq, если он ещё не задан в конфиге приложения; - console module
rabbitmq, поэтому CLI-роуты вида./yii rabbitmq/consumeработают без ручногоcontrollerMap.
Первый Packagist-релиз целится в 0.1.0: API уже пригоден для использования, но minor-релизы до 1.0.0
могут уточнять contracts и namespaces.
Минимальная конфигурация компонента:
Если проект не использует стандартную загрузку Yii2 extensions из Composer, подключите модуль явно:
Для graceful shutdown CLI consumers по SIGTERM/SIGINT нужен ext-pcntl. Без него consumers работают,
но graceful signal handling отключается.
Discovery для CLI:
Структура папок:
Publish:
Semantics / Gotchas
- Publisher confirms:
- Гарантирует, что брокер подтвердил приём публикации на уровне канала (ack/nack).
- НЕ гарантирует доставку consumer-у и успешную обработку сообщения.
- Корреляция подтверждений важна: подтверждения сопоставляются по publish sequence no + message_id.
- mandatory + basic.return:
- Возникает, когда сообщение unroutable (exchange существует, но нет подходящего binding).
- Это отдельный класс ошибки от сетевых/канальных: публикация дошла до брокера, но была возвращена.
- В режиме mandatory без confirm публикация не блокируется ожиданием (без publishTimeout).
- Возвраты обрабатываются асинхронно через return handler и требуют tick() или event loop.
- delivery_mode / persistence:
- Для сохранности при рестарте брокера нужны durable queue + persistent message (
delivery_mode=2). - Пакет не выставляет
delivery_modeавтоматически — задавайте его вpropertiesпри публикации.
- Для сохранности при рестарте брокера нужны durable queue + persistent message (
- Retry semantics:
- Managed retry использует заголовок
x-retry-count;x-deathне является источником истины. x-retry-countувеличивается при republish и ограничиваетсяmaxAttempts.retryExchangeна runtime берётся изtopology.options.retryExchange; значение из retry policy используется только как fallback.- Сообщение уходит в dead при
deadрешении, иначеrejectприводит кbasic_reject(false).
- Managed retry использует заголовок
- DLQ inspect semantics:
dlq-inspectпо умолчанию НЕ удаляет сообщения (basic.get+nack requeue=true).- Разрушающий режим только с
--ack=1 --force=1.
- Consumer fail-fast semantics:
- Fatal исключения приводят к
ConsumeResult::stop()(еслиconsumeFailFast=true). - Recoverable исключения приводят к
ConsumeResult::retry()и включают retry/dead правила.
- Fatal исключения приводят к
Profiles (multi-connection)
Если нужно несколько подключений, используйте profiles и defaultProfile:
В коде:
Connection hardening
TLS/SSL передается в php-amqplib как ssl options для stream context:
Consumer reconnect policy:
Для входящих сообщений можно ограничить тело и глубину JSON:
Ошибки лимитов и диагностика соединения не включают тело сообщения или секреты из DSN/token/password.
Profile (project defaults)
Profile позволяет задать общие defaults для definitions (consumers/publishers) без копипаста опций.
Это отдельная сущность от connection profiles и не обязательна.
Подключение профиля:
Допустимые значения profile: null, FQCN, callable или объект.
Если задан FQCN, используется Yii::createObject().
Пример профиля:
Правила merge:
- scalars: override
- associative arrays: recursive merge (overrides win)
- lists (numeric arrays): concat с уникальностью, defaults -> overrides
- если overrides содержит key => null, то дефолтный ключ удаляется
Consumer definition
Consumer class (definitions):
ID берется из имени класса: OrdersConsumer -> orders, AuditLogConsumer -> audit-log.
Component-level consume config (managedRetry, retryPolicy, consumeMiddlewares, consumeFailFast,
fatalExceptionClasses, recoverableExceptionClasses) является публичным runtime API. Эти значения
сливаются поверх profile defaults и consumer options, поэтому применяются и в CLI, и при прямом запуске
ConsumeRunner.
Publisher definition
Publisher class:
Публикация по id, найденному discovery (OrdersPublisher -> orders):
publishById() использует getExchange(), getRoutingKey(), getOptions(), profile publisher defaults,
component/publisher/runtime publish middlewares и runtime overrides из третьего аргумента.
CLI
Команды:
CLI rabbitmq/consume запускает ConsumeRunner и использует ту же семантику пайплайна (middlewares, ExceptionClassifier, managed retry).
Запуск идёт по consumer-id, полученному из discovery. Если discovery выключен или paths не заданы, CLI вернет ошибку.
Ключевые опции rabbitmq/consume:
--managedRetry=1и--retryPolicy='{"maxAttempts":3,"retryExchange":"retry-exchange","retryQueues":[...],"deadQueue":"..."}'(x-retry-count, retry/dead)--consumeFailFast=0|1,--fatalExceptionClasses=...,--recoverableExceptionClasses=...--memoryLimitMb=...(добавляет memory-limit middleware)--readyLock=/path/to/lock(lock-файл готовности)
Пример:
Список найденных consumers:
Список publishers/middlewares:
Multiple components / --component
Все console-команды поддерживают --component=<id> для выбора нужного компонента RabbitMqService.
Topology
Topology описывает exchanges/queues/bindings и их аргументы. Это не publish‑опции и не retry‑политика.
Topology строится только из topology-конфига компонента. Если включён discovery, сборка дополнительно
проверяет, что очереди всех найденных consumers описаны в topology.
CLI:
topology-apply по умолчанию работает как dry-run: валидирует topology без подключения к брокеру.
--dryRun=0 выполняет реальные declare/bind операции. --strict=1 оставлен как явная проверка перед apply;
сейчас итог не отличается, потому что apply всегда валидирует topology.
Consume semantics
- Handler может вернуть
ConsumeResultили legacybool.true→ ACKfalse→ RETRY (дальше применяется retry policy)
- Exception classification:
consumeFailFast=trueпревращает fatal исключения вConsumeResult::stop()и завершает consumer.- Recoverable исключения приводят к
ConsumeResult::retry()и проходят через retry policy.
- Managed retry публикует retry-сообщения через
topology.options.retryExchange, используетx-retry-countи не опирается наx-death.
Publish semantics
confirm=trueвключает publisher confirms: публикация ждёт ack/nack, при nack или таймауте будетPublishException.mandatory=trueвключает mandatory publish: unroutable сообщения фиксируются через ReturnSink, без блокирующего ожидания.mandatoryStrict=true(по умолчанию) делает unroutable ошибкой приconfirm=true— будетPublishException(PUBLISH_UNROUTABLE).publishTimeoutзадаёт таймаут ожидания подтверждения приconfirm=true.
Возвраты обрабатываются через ReturnSink и доступны через tick() + drainReturns().
В режиме mandatory=true без confirms это best‑effort: ошибки доставки нужно опрашивать.
ReturnSink и tick/poll
При mandatory=true callbacks basic.return снимаются при вызове tick() и попадают в ReturnSink.
Конфигурация:
Псевдо‑loop в publish‑only приложении:
Managed retry
Если managedRetry=true, то при handler=false или ConsumeResult::retry() применяется retry policy и:
- retry — перепубликация в retry-очередь через
topology.options.retryExchangeили fallbackretryPolicy.retryExchange, затем ACK - dead — перепубликация в dead-очередь (если указана), затем ACK
- reject —
basic_reject(false)
maxAttempts обязателен и должен быть положительным integer. Каждая retryQueues[] запись требует
name и ttlMs. При исчерпании попыток x-retry-count записывается как
min(currentAttempt + 1, maxAttempts).
Пример политики:
Если в topology задан options.retryExchange, runtime retry всегда публикует туда, даже если
retryPolicy.retryExchange отличается.
Ready protocol
CLI consumer readiness сигнализируется lock-файлом (stdout не используется).
--readyLock=/path/to/lock создаёт lock-файл при старте и удаляет при остановке.
Без --readyLock путь строится внутри @runtime/rabbitmq. Для пользовательского пути владелец приложения
отвечает за безопасную директорию; пакет отказывается писать в symlink target и symlink directory.
Serialization & Envelope
Можно публиковать типизированные сообщения через Envelope и сериализатор:
Упрощенный JSON publish:
Декодирование в handler:
JSON envelope формат: объект с payload и обязательным messageId (или message_id).
Middlewares
Можно подключить middleware для publish/consume:
Встроенные middleware:
CorrelationIdMiddleware— гарантируетcorrelation_idв свойствах и в Envelope.PublishLoggingMiddleware— логирует отправку без payload/body.ConsumeLoggingMiddleware— логирует start/end и ошибки handler-а без body.
RPC (request/reply)
Client:
Server:
RPC и middleware:
- RPC Client/Server не используют общий publish/consume middleware pipeline.
- Это намеренно: RPC — отдельный протокольный слой с собственным циклом ожидания ответа.
- Расширение: можно кастомизировать handler (callable), сериализацию через Envelope/serializer, и таймауты в RpcClient::call().
Public API
Публичными для SemVer считаются namespaces:
illusiard\rabbitmq\componentsillusiard\rabbitmq\definitionsillusiard\rabbitmq\messageillusiard\rabbitmq\definitions\consumeillusiard\rabbitmq\topologyillusiard\rabbitmq\exceptions- documented console routes under
rabbitmq/*
Остальные namespaces (amqp, orchestration, helpers, часть console, profile) пока являются поддерживаемой
реализацией, но могут уточняться до 1.0.0.
DLQ tools
Просмотр сообщений в DLQ:
По умолчанию dlq-inspect безопасен и не удаляет сообщения (requeue=true).
Разрушающий режим включается только с --ack=1 --force=1.
Повторная отправка после исправления ошибки:
Очистка мусорной очереди:
Troubleshooting / FAQ
Unroutable message (mandatory/basic.return):
- возникает при
mandatory=true, если нет binding для routingKey на exchange - это не сетевой сбой; проверьте topology и логи
PUBLISH_UNROUTABLE
Confirm timeout:
- проверьте
publishTimeout, network latency и нагрузку на брокер - timeout не означает, что сообщение не принято — смотрите confirms и return события
Exchange/queue not declared:
- выполните
rabbitmq/topology-apply --dryRun=0или проверьте provisioning инфраструктуры
Permissions/vhost errors:
- проверьте
vhost, права пользователя и учётные данные
Connection refused / heartbeat timeouts:
- проверьте доступность хоста/порта, firewall, а также
heartbeatи таймауты
Consumer exits on exception:
- fatal исключения завершают процесс (fail-fast), recoverable возвращают
ConsumeResult::retry()и запускают retry/dead - настройте
consumeFailFast,fatalExceptionClasses,recoverableExceptionClasses
Retry loops / attempts:
- managed retry опирается на
x-retry-count, увеличиваемый при republish - убедитесь, что политика соответствует ожидаемому числу попыток
DLQ inspect shows same message:
dlq-inspectrequeue=true, поэтому одно и то же сообщение видно повторно- при одном запуске используйте fingerprint/dedup на стороне клиента, destructive режим только с
--ack=1 --force=1
Consumer restart strategy:
- используйте supervisor/systemd/k8s и внешние healthchecks, чтобы перезапускать consumer при ошибках
Error Codes
- CONFIG_INVALID: неверный формат конфигурации
- CONNECTION_FAILED: ошибка соединения с брокером
- CHANNEL_FAILED: ошибка открытия канала
- PUBLISH_FAILED: ошибка публикации
- PUBLISH_NACK: broker NACK
- PUBLISH_TIMEOUT: таймаут confirms
- PUBLISH_UNROUTABLE: сообщение unroutable
- PUBLISH_UNROUTABLE_UNCORRELATED: unroutable сообщение без корреляции
- CONSUME_FAILED: ошибка consumer pipeline
- HANDLER_FAILED: ошибка handler
- SERIALIZATION_FAILED: ошибка сериализации/десериализации
- MESSAGE_LIMIT_EXCEEDED: входящее сообщение превысило настроенный лимит
- RPC_TIMEOUT: таймаут RPC
- TOPOLOGY_INVALID: неверный формат topology
- DLQ_FAILED: ошибка DLQ операций
Healthcheck
Команда проверяет соединение и канал, возвращает OK или FAIL:
Integration tests
Запуск:
Локальный RabbitMQ (docker compose):
Env для подключения:
RABBIT_HOST(default: localhost)RABBIT_PORT(default: 5672)RABBIT_USER(default: guest)RABBIT_PASSWORD(default: guest)RABBIT_VHOST(default: /)
Env-флаги:
RABBITMQ_REQUIRED=1(делает недоступный RabbitMQ ошибкой, удобно для CI)NACK_CAN_BE_FORCED(confirm NACK path)BLOCK_BROKER(publishTimeout)KILL_CONNECTION(reconnect scenarios)RABBIT_CAN_RESTART(durable/persistent)
All versions of yii2-rabbitmq with dependencies
php-amqplib/php-amqplib Version ^3.5
phpseclib/phpseclib Version ^3.0.52
yiisoft/yii2 Version ^2.0.55
ext-ctype Version *
ext-pcntl Version *