Amazon MSK (Managed Kafka)
LocalEmu Amazon MSK implements all 59 operations and pairs the control plane with a real Apache Kafka broker running in Docker. Cluster records, configurations, configuration revisions, SCRAM secret associations, replicators, VPC connections, tags, and the full MSK Connect API surface ride on the moto-ext metadata backend. The behavior plane is the official apache/kafka:3.7.1 image running in KRaft mode (no Zookeeper), started by LocalEmu's cluster manager and exposed on an ephemeral host port. GetBootstrapBrokers returns the live broker address; DeleteCluster stops and removes the container.
Operation-level coverage: see the MSK coverage matrix.
Quick start
# Real Kafka container needs MSK_DOCKER_BACKEND=1 (off by default).
$ MSK_DOCKER_BACKEND=1 localemu start
$ awsemu kafka create-cluster --cluster-name events \
--kafka-version 3.7.1 --number-of-broker-nodes 1 \
--broker-node-group-info '{
"InstanceType":"kafka.m5.large",
"ClientSubnets":["subnet-1"],
"SecurityGroups":["sg-1"]
}' \
--query 'ClusterArn' --output text
arn:aws:kafka:us-east-1:000000000000:cluster/events/1234abcd-...
$ ARN=$(awsemu kafka list-clusters --cluster-name-filter events \
--query 'ClusterInfoList[0].ClusterArn' --output text)
# Poll until the broker container is RUNNING and ports are reachable.
$ awsemu kafka describe-cluster --cluster-arn $ARN \
--query 'ClusterInfo.{State:State,Brokers:NumberOfBrokerNodes}'
{"State": "ACTIVE", "Brokers": 1}
$ awsemu kafka get-bootstrap-brokers --cluster-arn $ARN \
--query 'BootstrapBrokerString' --output text
127.0.0.1:54321 The Docker backend is opt-in via MSK_DOCKER_BACKEND=1. Without it, CreateCluster records the cluster in moto only and GetBootstrapBrokers returns an empty string. With it on, LocalEmu pulls apache/kafka:<version> if missing, allocates an ephemeral host port, boots the container in KRaft single-node mode, and waits for the PLAINTEXT listener to accept TCP connections plus a 5 s KRaft quorum-election settle before reporting ACTIVE.
Produce + consume with kafka-python
$ python3 - <<'PY'
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers="127.0.0.1:54321")
producer.send("orders", b'{"order_id":42}')
producer.flush()
producer.close()
consumer = KafkaConsumer(
"orders",
bootstrap_servers="127.0.0.1:54321",
auto_offset_reset="earliest",
consumer_timeout_ms=2000,
group_id="g1",
)
for msg in consumer:
print("Got:", msg.value.decode())
PY
Got: {"order_id":42} Architecture
Code lives at services/kafka/. The provider class KafkaProvider at provider.py:56 is registered as kafka:default in plux.ini:72 via services/providers.py:751. Six operations are LocalEmu-custom:
- •
CreateCluster(provider.py:67): stores in moto, then, ifMSK_DOCKER_BACKEND=1, hands off toClusterManager.instance().create_cluster(). State is set toACTIVEonly after the broker port is reachable. - •
DescribeCluster(provider.py:113): enriches the moto response with the liveStateandNumberOfBrokerNodesfrom the cluster manager. - •
ListClusters(provider.py:141): the moto list, with each entry'sStateoverlaid. - •
DeleteCluster(provider.py:159): stops the container with a 10 s grace, force-removes it, then drops the moto record. - •
GetBootstrapBrokers(provider.py:176): returns"127.0.0.1:<host-port>"from the cluster manager, or empty when no live cluster. - •
ListNodes(provider.py:184): one entry per live broker, with the AWS-shapeBrokerNodeInfo.Endpointspopulated from the ephemeral host port.
Everything else (configurations, configuration revisions, V2 cluster ops, replicators, VPC connections, SCRAM secret associations, the full Update* family, tags, policies, and MSK Connect resources) rides on moto unchanged via MotoFallbackDispatcher.
ClusterManager + Kafka driver
The Docker layer lives at services/kafka/docker/cluster_manager.py. ClusterManager at cluster_manager.py:64-82 is a thread-safe singleton holding _clusters: dict[cluster_arn, list[BrokerInfo]]. The driver runs Kafka in KRaft mode (no Zookeeper) by setting:
- •
KAFKA_PROCESS_ROLES=broker,controller(the broker is its own controller). - •
KAFKA_CONTROLLER_QUORUM_VOTERS=<broker-id>@localhost:9091. - •Two PLAINTEXT listeners:
HOST://0.0.0.0:9092advertised as127.0.0.1:<host-port>for host clients, andDOCKER://0.0.0.0:9093advertised as<container-name>:9093for clients on the LocalEmu bridge network. - •Replication factors and ISR pinned to 1 (offsets topic, transaction state log, default min ISR) to keep a single-broker cluster healthy.
- •Readiness gate:
_wait_for_port()atcluster_manager.py:300-319polls the host port (2 s socket timeout, up to 120 s total) then sleeps 5 s for the KRaft quorum to elect. - •Container labels stamped on creation (
cluster_manager.py:256-264):localemu.msk.cluster-id,cluster-arn,broker-id,plain-port,kraft-uuid,kafka-version.
The image tag follows the KafkaVersion field from CreateCluster: pass 3.7.1 and you get apache/kafka:3.7.1. Any image tag in the apache/kafka repository works as long as it speaks KRaft and accepts the standard KAFKA_* environment variables.
Features supported
| Feature | Notes |
|---|---|
| Cluster lifecycle | CreateCluster (Docker pull + boot + readiness), DeleteCluster (stop + remove), DescribeCluster, ListClusters, ListNodes. |
| Bootstrap discovery | GetBootstrapBrokers returns the live PLAINTEXT host address. |
| Wire protocol | Full Kafka wire protocol via the upstream broker: produce, consume, consumer groups, transactions, idempotent producer, exactly-once semantics, log compaction. |
| Topic management | Use any Kafka admin client (kafka-python, confluent-kafka, kafka-topics.sh) against the bootstrap broker. AWS-side CreateTopic/ListTopics/UpdateTopic on the MSK API are metadata in moto only. |
| Configurations | CreateConfiguration, UpdateConfiguration, DescribeConfiguration, DescribeConfigurationRevision, ListConfigurations, ListConfigurationRevisions: metadata only (see Limitations). |
| Updates | UpdateBrokerCount, UpdateBrokerStorage, UpdateBrokerType, UpdateClusterConfiguration, UpdateClusterKafkaVersion, UpdateConnectivity, UpdateMonitoring, UpdateRebalancing, UpdateSecurity, UpdateStorage: metadata only. |
| SCRAM secrets | BatchAssociateScramSecret, BatchDisassociateScramSecret, ListScramSecrets: metadata only. |
| VPC connections | CreateVpcConnection, DeleteVpcConnection, ListVpcConnections, RejectClientVpcConnection: metadata only. |
| Replicators | CreateReplicator, DescribeReplicator, ListReplicators, UpdateReplicationInfo: metadata only. |
| Policies | PutClusterPolicy, GetClusterPolicy, DeleteClusterPolicy: metadata only. |
| Tags | TagResource, UntagResource, ListTagsForResource. |
| Persistence | moto cluster metadata restored on PERSISTENCE=1 restart; container labels stamped for future rehydration but the cluster manager does not currently re-read them (see Limitations). |
Endpoint shape
GetBootstrapBrokers populates only the PLAINTEXT field. The TLS, SASL_SCRAM, and SASL_IAM bootstrap-string fields are left empty.
BootstrapBrokerString = "127.0.0.1:<host-port>"
BootstrapBrokerStringTls = ""
BootstrapBrokerStringSaslScram = ""
BootstrapBrokerStringSaslIam = "" Reachability:
- •Host process (test runner, CLI, IDE): connect to
127.0.0.1:<host-port>. - •LocalEmu gateway: same as above.
- •Inside another LocalEmu container (Lambda, ECS task, EC2 docker instance): use either the broker container's DNS name on the LocalEmu bridge (
<container-name>:9093), orhost.docker.internal:<host-port>on Docker Desktop.
# From inside a LocalEmu Docker container, 127.0.0.1 is the container.
# Two options to reach the broker from inside Lambda/ECS/EC2-docker:
# 1) the container DNS name <localemu-msk-...>:9093 on the localemu bridge
# 2) host.docker.internal:<host-port> (Docker Desktop)
$ awsemu lambda create-function --function-name kafka-producer \
--runtime python3.12 --handler index.handler \
--role arn:aws:iam::000000000000:role/lambda-role \
--environment 'Variables={KAFKA_BOOTSTRAP=host.docker.internal:54321}' \
--zip-file fileb://producer.zip
$ awsemu lambda invoke --function-name kafka-producer \
--payload '{"order_id":42}' /tmp/out.json
{"StatusCode": 200} ListNodes
$ awsemu kafka list-nodes --cluster-arn $ARN \
--query 'NodeInfoList[].{Id:BrokerNodeInfo.BrokerId,Eps:BrokerNodeInfo.Endpoints,Type:InstanceType}'
[{
"Id": 1,
"Eps": ["127.0.0.1:54321"],
"Type": "kafka.m5.large"
}] Teardown
$ awsemu kafka delete-cluster --cluster-arn $ARN
# DeleteCluster stops the container (10 s grace) then force-removes it,
# then drops the moto record.
$ awsemu kafka list-clusters --query 'ClusterInfoList[].ClusterName'
[] Configuration
| Variable | Default | Purpose |
|---|---|---|
MSK_DOCKER_BACKEND | (unset) | Set to 1 or true to boot a real apache/kafka container per CreateCluster. When unset, clusters are moto-only and GetBootstrapBrokers returns empty. Read at provider.py:53. |
Integration points
| Service | How it touches MSK |
|---|---|
| CloudFormation | AWS::MSK::Cluster resource provider drives CreateCluster through this provider. |
| Secrets Manager | Common pattern: store SCRAM passwords or admin credentials as secrets, fetch at deploy time. (The MSK SCRAM-secret association API is metadata-only; the broker still runs PLAINTEXT.) |
| CloudTrail | All MSK control-plane calls appear in the CloudTrail event store and the dashboard. |
Known limitations
- •Single broker only.
NumberOfBrokerNodes > 1is accepted at the API; the Docker driver logs a warning and starts one broker (cluster_manager.py:99-104). No multi-AZ, no ISR > 1. - •PLAINTEXT only.
ClientAuthentication(TLS / SASL_SCRAM / SASL_IAM / Unauthenticated) is accepted and stored in moto, but the broker container only runs PLAINTEXT listeners. No certificate mount, no SCRAM credential injection, no IAM-token authentication. - •
EncryptionInfois metadata-only. Stored on the cluster record; messages in transit travel over plaintext on the host loopback. - •Every
Update*op is metadata-only.UpdateBrokerCount,UpdateBrokerStorage,UpdateBrokerType,UpdateClusterConfiguration,UpdateClusterKafkaVersion,UpdateConnectivity,UpdateMonitoring,UpdateRebalancing,UpdateSecurity,UpdateStorageupdate the moto record; the running container is not rolled, resized, or reconfigured. - •
RebootBrokeris metadata-only. Falls through to moto; the container is not restarted. Usedocker restartorDeleteCluster+CreateCluster. - •Configurations and configuration revisions are metadata-only. Saved in moto; the broker container is not hot-reloaded with the new
server.properties. To exercise non-default broker settings, set them viaKAFKA_*environment variables in the driver or use the Kafka admin API. - •Known limitation: MSK as a Lambda Event Source Mapping source is not implemented. Lambda's event-source poller does not yet recognise Kafka-style sources that lack an AWS ARN. Until that lands, consume from Lambda using a Kafka client (kafka-python, confluent-kafka, ...) directly against the bootstrap string.
- •No EventBridge Pipes integration. There is no MSK source or target in
services/pipes/. - •MSK Connect (
kafkaconnectservice) is not implemented. Separate AWS service; not registered. Run Kafka Connect yourself as a sidecar Docker container if you need connectors. - •
CreateClusterV2serverless versus provisioned distinction is metadata-only. Falls through to moto; both kinds resolve to a single provisioned single-broker container if the Docker backend is on. - •Label-based rehydration after a LocalEmu process restart is not yet wired. The driver stamps the cluster ARN, broker id, and port on the container as labels, but
ClusterManager.get_cluster()reads only the in-memory_clustersdict (cluster_manager.py:173-184). After a process restart, the moto cluster record reloads but the cluster manager will not re-attach to the still-running container;GetBootstrapBrokersreturns empty until the cluster is recreated.