Amazon MQ
LocalEmu Amazon MQ implements all 24 operations and pairs them with a real RabbitMQ broker running in Docker on the host. The control plane (broker records, users, configurations, configuration revisions, tags, engine catalog) is served by the moto-ext metadata backend. The behavior plane (the AMQP/MQTT/STOMP/management protocols that your application actually speaks) is the official rabbitmq:3.13-management image, started by LocalEmu's broker manager and exposed on ephemeral host ports. The two planes are kept in sync: DescribeBroker returns endpoints rewritten to the real container ports, and DeleteBroker stops and removes the container.
Operation-level coverage: see the MQ coverage matrix.
Quick start
# Real RabbitMQ container needs MQ_DOCKER_BACKEND=1 (off by default).
$ MQ_DOCKER_BACKEND=1 localemu start
$ awsemu mq create-broker --broker-name orders \
--engine-type RABBITMQ --engine-version 3.13 \
--host-instance-type mq.t3.micro \
--deployment-mode SINGLE_INSTANCE \
--publicly-accessible \
--users '[{"Username":"admin","Password":"supersecret123","Groups":["admin"]}]' \
--query '{Id:BrokerId,Arn:BrokerArn}'
{
"Id": "b-1234abcd-5678-...",
"Arn": "arn:aws:mq:us-east-1:000000000000:broker:orders:b-1234abcd-..."
}
$ awsemu mq describe-broker --broker-id b-1234abcd-... \
--query '{State:BrokerState,Eps:BrokerInstances[0].Endpoints,Console:BrokerInstances[0].ConsoleURL}'
{
"State": "RUNNING",
"Eps": ["amqp://127.0.0.1:54321","amqps://127.0.0.1:54322","mqtt+ssl://127.0.0.1:54323","stomp+ssl://127.0.0.1:54324"],
"Console": "https://127.0.0.1:54325/"
} The Docker backend is opt-in via MQ_DOCKER_BACKEND=1. Without it, CreateBroker records the broker in moto only and you will not have a live AMQP endpoint to connect to. With it on, LocalEmu pulls rabbitmq:3.13-management if missing, allocates host ports for each protocol, boots the container, and waits for the AMQP port to accept TCP connections before returning.
Real AMQP round-trip with pika
$ python3 - <<'PY'
import pika
params = pika.ConnectionParameters(
host="127.0.0.1", port=54321,
credentials=pika.PlainCredentials("admin", "supersecret123"),
)
conn = pika.BlockingConnection(params)
ch = conn.channel()
ch.queue_declare(queue="hello")
ch.basic_publish(exchange="", routing_key="hello", body=b"world")
method, props, body = ch.basic_get(queue="hello", auto_ack=True)
print("Got:", body.decode())
conn.close()
PY
Got: world Architecture
Code lives at services/mq/. The provider class MqProvider at provider.py:54 is registered as mq:default in plux.ini:86 via services/providers.py:798. Five operations are LocalEmu-custom:
- •
CreateBroker(provider.py:65): stores the broker in moto, then, ifMQ_DOCKER_BACKEND=1, hands off toBrokerManager.instance().create_broker()to allocate ports, pull the image, run the container, and wait for readiness. State is set toRUNNINGonly after the AMQP port is reachable. - •
DescribeBroker(provider.py:132): rewrites the endpoint list to point at127.0.0.1:<host-port>for every exposed protocol;BrokerStatereflects the live container status. - •
ListBrokers(provider.py:155): the moto list, with each entry'sBrokerStateoverlaid from the Docker layer. - •
DeleteBroker(provider.py:172): stops the container (10 s timeout), removes it with force, then drops the moto metadata. - •
RebootBroker(provider.py:184): callsDOCKER_CLIENT.restart_container()and waits for AMQP readiness again before returning.
Everything else (users, configurations and revisions, tags, engine catalog, UpdateBroker, Promote) rides on moto unchanged through MotoFallbackDispatcher.
BrokerManager + RabbitMQ driver
The Docker layer lives at services/mq/docker/. BrokerManager at broker_manager.py:89 is a thread-safe singleton holding _brokers: dict[broker_id, BrokerInstance]. The RabbitMQ driver at rabbitmq.py:
- •Uses
rabbitmq:3.13-management(version overridable fromEngineVersionon the broker). - •Allocates an ephemeral host port for each container port:
5672(amqp),5671(amqps),15672(management UI),1883(mqtt),61613(stomp). - •Seeds the admin user from the first entry in
Users[]viaRABBITMQ_DEFAULT_USERandRABBITMQ_DEFAULT_PASS. - •Readiness gate: TCP-connect on the AMQP port + 3 s plugin-chain settle delay (
rabbitmq.py:273-281). The 3 s is empirically the time RabbitMQ needs to finish loading the management plugin after the listener opens. - •Stamps three labels on the container for restart-time rehydration:
localemu.mq.broker-id,localemu.mq.engine,localemu.mq.port.<protocol>.
Features supported
| Feature | Notes |
|---|---|
| Broker lifecycle | CreateBroker (Docker pull + boot + readiness), DeleteBroker (stop + remove), RebootBroker (restart + re-readiness), DescribeBroker, ListBrokers. |
| Protocols exposed | AMQP 0-9-1 + AMQP 1.0, AMQPS (TLS), MQTT, STOMP, and the RabbitMQ management HTTP API. |
| Users | CreateUser, UpdateUser, DeleteUser, DescribeUser, ListUsers against the moto control plane. First-user credentials are seeded into the running broker; later user CRUD does not propagate (see Limitations). |
| Configurations | CreateConfiguration, UpdateConfiguration, DescribeConfiguration, DescribeConfigurationRevision, ListConfigurations, ListConfigurationRevisions. Stored in moto; revision history works; the broker container is not hot-reloaded (see Limitations). |
| Tags | CreateTags, DeleteTags, ListTags on brokers and configurations. |
| Engine catalog | DescribeBrokerEngineTypes, DescribeBrokerInstanceOptions return the AWS-style catalog from moto. |
| Promotion | Promote succeeds at the moto control plane (used for cross-region replication API parity). No replica container is actually promoted. |
| Updates | UpdateBroker updates the moto record; the running container is not modified (see Limitations). |
| Persistence | moto state restored from disk on PERSISTENCE=1 restart; Docker containers persist via Docker itself; BrokerManager rehydrates from container labels (broker_manager.py:315-341). |
Endpoint shape
DescribeBroker rewrites every endpoint to point at the host loopback on the allocated port:
BrokerInstances[0].Endpoints = [
"amqp://127.0.0.1:<amqp>",
"amqps://127.0.0.1:<amqps>",
"stomp+ssl://127.0.0.1:<stomp>",
"mqtt+ssl://127.0.0.1:<mqtt>",
]
BrokerInstances[0].IpAddress = "127.0.0.1"
BrokerInstances[0].ConsoleURL = "https://127.0.0.1:<mgmt>/" Reachability:
- •Host process (your test runner, your local CLI, your IDE): connect directly to
127.0.0.1:<port>. - •LocalEmu gateway (same OS process as the API): same as above.
- •Inside another LocalEmu Docker container (Lambda, ECS task, EC2 docker instance):
127.0.0.1resolves to that container itself, not the host. Substitutehost.docker.internalon Docker Desktop, or the Docker host-bridge IP on Linux.
# Inside a Lambda (or any other LocalEmu Docker container), 127.0.0.1 is
# the container, not the host. Use host.docker.internal instead.
$ awsemu lambda create-function --function-name publish-order \
--runtime python3.12 --handler index.handler \
--role arn:aws:iam::000000000000:role/lambda-role \
--environment 'Variables={MQ_HOST=host.docker.internal,MQ_PORT=54321}' \
--zip-file fileb://publish.zip
$ awsemu lambda invoke --function-name publish-order \
--payload '{"order_id":42}' /tmp/out.json
{"StatusCode": 200}
$ cat /tmp/out.json
{"published": true, "queue": "orders"} RabbitMQ management UI
The -management image includes the RabbitMQ HTTP API and web UI on port 15672 (container-side), exposed by LocalEmu on an ephemeral host port. DescribeBroker reports it as ConsoleURL. Authenticate with the first-user credentials from CreateBroker.
$ CONSOLE=$(awsemu mq describe-broker --broker-id b-1234abcd-... \
--query 'BrokerInstances[0].ConsoleURL' --output text)
$ echo $CONSOLE
https://127.0.0.1:54325/
# RabbitMQ management UI runs on the management port baked into the image.
# Log in with the first-user credentials from CreateBroker.
$ curl -sk -u admin:supersecret123 $CONSOLE/api/queues \
| python3 -c 'import sys,json;[print(q["name"]) for q in json.load(sys.stdin)]'
hello Persistence
With PERSISTENCE=1, moto's mq_backends roundtrips through the standard state save/load path (provider.py:57-60). Docker containers are preserved by Docker itself across LocalEmu process restarts. On the next API call, BrokerManager rehydrates each BrokerInstance from the container labels stamped at create-time. The same broker-id continues to resolve to the same container, and queue+message state inside the broker is whatever RabbitMQ itself persisted.
# With PERSISTENCE=1, the broker metadata reloads from disk on restart.
# The Docker container itself is preserved by Docker, and BrokerManager
# rehydrates the BrokerInstance lazily on the next API call.
$ MQ_DOCKER_BACKEND=1 PERSISTENCE=1 localemu start
$ awsemu mq create-broker --broker-name orders \
--engine-type RABBITMQ --engine-version 3.13 \
--host-instance-type mq.t3.micro --deployment-mode SINGLE_INSTANCE \
--publicly-accessible \
--users '[{"Username":"admin","Password":"hunter2hunter2"}]' >/dev/null
$ localemu restart
$ awsemu mq list-brokers --query 'BrokerSummaries[].{Name:BrokerName,State:BrokerState}'
[{"Name": "orders", "State": "RUNNING"}] Configuration
| Variable | Default | Purpose |
|---|---|---|
MQ_DOCKER_BACKEND | (unset) | Set to 1 or true to boot a real RabbitMQ container per CreateBroker. When unset, brokers are recorded in moto only and no live AMQP endpoint exists. Read at provider.py:51. |
Integration points
| Service | How it touches MQ |
|---|---|
| CloudFormation | AWS::AmazonMQ::Broker resource provider drives CreateBroker through this provider. |
| Secrets Manager | Common pattern: store broker admin password as a secret, fetch via awsemu secretsmanager get-secret-value at deploy time. |
| CloudTrail | All MQ control-plane calls (CreateBroker, DeleteBroker, ...) appear in the CloudTrail event store and are recorded by the dashboard. |
Known limitations
- •ActiveMQ engine is not implemented.
EngineType=ACTIVEMQraisesNotImplementedError("v1 supports RABBITMQ; ActiveMQ is coming next.") atbroker_manager.py:140. Pick RABBITMQ for the live broker; ACTIVEMQ brokers will only exist as moto records. - •Single-instance only.
DeploymentMode=ACTIVE_STANDBY_MULTI_AZandCLUSTER_MULTI_AZare accepted on the API; the Docker driver always runs one instance and reports one entry inBrokerInstances. - •
UpdateBrokeris metadata-only. Mutable fields (instance type, engine version, maintenance window, security groups, logs config) update the moto record; the running container is not recreated or reconfigured. - •Configuration revisions are metadata-only.
CreateConfigurationandUpdateConfigurationkeep a clean revision history, but the broker container is not hot-reloaded with the new XML/JSON. Use the RabbitMQ management API to push runtime config to the live broker. - •User CRUD after CreateBroker does not propagate to the running broker. Only the first user from
CreateBrokeris seeded viaRABBITMQ_DEFAULT_USER/_PASS. LaterCreateUser/UpdateUser/DeleteUsercalls update the moto record only. Manage runtime users through the management API. - •LDAP authentication is metadata-only.
LdapServerMetadatais stored in the broker record; the RabbitMQ container is not configured to use the LDAP backend. - •No EventBridge Pipes integration. There is no RabbitMQ source poller or target sender in
services/pipes/orservices/lambda_/event_source_mapping/pollers/. To consume from a queue inside Lambda, connect to the broker yourself using pika or another AMQP client. - •Cross-container reach uses the host bridge.
DescribeBrokeralways returns127.0.0.1. From inside another LocalEmu container, swap tohost.docker.internal(Docker Desktop) or the host-bridge IP (Linux). - •Containers are not auto-restarted across LocalEmu process restarts. Docker preserves them, but if you
docker stopthe container externally, LocalEmu will not bring it back up;DescribeBrokerwill reflect the stopped state.