Docs / Amazon MSK

Amazon MSK (Managed Kafka)

LocalEmu Amazon MSK implements all 59 operations and pairs the control plane with a real Apache Kafka broker running in Docker. Cluster records, configurations, configuration revisions, SCRAM secret associations, replicators, VPC connections, tags, and the full MSK Connect API surface ride on the moto-ext metadata backend. The behavior plane is the official apache/kafka:3.7.1 image running in KRaft mode (no Zookeeper), started by LocalEmu's cluster manager and exposed on an ephemeral host port. GetBootstrapBrokers returns the live broker address; DeleteCluster stops and removes the container.

Operation-level coverage: see the MSK coverage matrix.

Quick start

Terminal
# Real Kafka container needs MSK_DOCKER_BACKEND=1 (off by default).
$ MSK_DOCKER_BACKEND=1 localemu start

$ awsemu kafka create-cluster --cluster-name events \
    --kafka-version 3.7.1 --number-of-broker-nodes 1 \
    --broker-node-group-info '{
      "InstanceType":"kafka.m5.large",
      "ClientSubnets":["subnet-1"],
      "SecurityGroups":["sg-1"]
    }' \
    --query 'ClusterArn' --output text
arn:aws:kafka:us-east-1:000000000000:cluster/events/1234abcd-...

$ ARN=$(awsemu kafka list-clusters --cluster-name-filter events \
    --query 'ClusterInfoList[0].ClusterArn' --output text)

# Poll until the broker container is RUNNING and ports are reachable.
$ awsemu kafka describe-cluster --cluster-arn $ARN \
    --query 'ClusterInfo.{State:State,Brokers:NumberOfBrokerNodes}'
{"State": "ACTIVE", "Brokers": 1}

$ awsemu kafka get-bootstrap-brokers --cluster-arn $ARN \
    --query 'BootstrapBrokerString' --output text
127.0.0.1:54321

The Docker backend is opt-in via MSK_DOCKER_BACKEND=1. Without it, CreateCluster records the cluster in moto only and GetBootstrapBrokers returns an empty string. With it on, LocalEmu pulls apache/kafka:<version> if missing, allocates an ephemeral host port, boots the container in KRaft single-node mode, and waits for the PLAINTEXT listener to accept TCP connections plus a 5 s KRaft quorum-election settle before reporting ACTIVE.

Produce + consume with kafka-python

Terminal
$ python3 - <<'PY'
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers="127.0.0.1:54321")
producer.send("orders", b'{"order_id":42}')
producer.flush()
producer.close()

consumer = KafkaConsumer(
    "orders",
    bootstrap_servers="127.0.0.1:54321",
    auto_offset_reset="earliest",
    consumer_timeout_ms=2000,
    group_id="g1",
)
for msg in consumer:
    print("Got:", msg.value.decode())
PY
Got: {"order_id":42}

Architecture

Code lives at services/kafka/. The provider class KafkaProvider at provider.py:56 is registered as kafka:default in plux.ini:72 via services/providers.py:751. Six operations are LocalEmu-custom:

Everything else (configurations, configuration revisions, V2 cluster ops, replicators, VPC connections, SCRAM secret associations, the full Update* family, tags, policies, and MSK Connect resources) rides on moto unchanged via MotoFallbackDispatcher.

ClusterManager + Kafka driver

The Docker layer lives at services/kafka/docker/cluster_manager.py. ClusterManager at cluster_manager.py:64-82 is a thread-safe singleton holding _clusters: dict[cluster_arn, list[BrokerInfo]]. The driver runs Kafka in KRaft mode (no Zookeeper) by setting:

The image tag follows the KafkaVersion field from CreateCluster: pass 3.7.1 and you get apache/kafka:3.7.1. Any image tag in the apache/kafka repository works as long as it speaks KRaft and accepts the standard KAFKA_* environment variables.

Features supported

FeatureNotes
Cluster lifecycleCreateCluster (Docker pull + boot + readiness), DeleteCluster (stop + remove), DescribeCluster, ListClusters, ListNodes.
Bootstrap discoveryGetBootstrapBrokers returns the live PLAINTEXT host address.
Wire protocolFull Kafka wire protocol via the upstream broker: produce, consume, consumer groups, transactions, idempotent producer, exactly-once semantics, log compaction.
Topic managementUse any Kafka admin client (kafka-python, confluent-kafka, kafka-topics.sh) against the bootstrap broker. AWS-side CreateTopic/ListTopics/UpdateTopic on the MSK API are metadata in moto only.
ConfigurationsCreateConfiguration, UpdateConfiguration, DescribeConfiguration, DescribeConfigurationRevision, ListConfigurations, ListConfigurationRevisions: metadata only (see Limitations).
UpdatesUpdateBrokerCount, UpdateBrokerStorage, UpdateBrokerType, UpdateClusterConfiguration, UpdateClusterKafkaVersion, UpdateConnectivity, UpdateMonitoring, UpdateRebalancing, UpdateSecurity, UpdateStorage: metadata only.
SCRAM secretsBatchAssociateScramSecret, BatchDisassociateScramSecret, ListScramSecrets: metadata only.
VPC connectionsCreateVpcConnection, DeleteVpcConnection, ListVpcConnections, RejectClientVpcConnection: metadata only.
ReplicatorsCreateReplicator, DescribeReplicator, ListReplicators, UpdateReplicationInfo: metadata only.
PoliciesPutClusterPolicy, GetClusterPolicy, DeleteClusterPolicy: metadata only.
TagsTagResource, UntagResource, ListTagsForResource.
Persistencemoto cluster metadata restored on PERSISTENCE=1 restart; container labels stamped for future rehydration but the cluster manager does not currently re-read them (see Limitations).

Endpoint shape

GetBootstrapBrokers populates only the PLAINTEXT field. The TLS, SASL_SCRAM, and SASL_IAM bootstrap-string fields are left empty.

BootstrapBrokerString                  = "127.0.0.1:<host-port>"
BootstrapBrokerStringTls               = ""
BootstrapBrokerStringSaslScram         = ""
BootstrapBrokerStringSaslIam           = ""

Reachability:

Terminal
# From inside a LocalEmu Docker container, 127.0.0.1 is the container.
# Two options to reach the broker from inside Lambda/ECS/EC2-docker:
#   1) the container DNS name <localemu-msk-...>:9093 on the localemu bridge
#   2) host.docker.internal:<host-port>  (Docker Desktop)
$ awsemu lambda create-function --function-name kafka-producer \
    --runtime python3.12 --handler index.handler \
    --role arn:aws:iam::000000000000:role/lambda-role \
    --environment 'Variables={KAFKA_BOOTSTRAP=host.docker.internal:54321}' \
    --zip-file fileb://producer.zip

$ awsemu lambda invoke --function-name kafka-producer \
    --payload '{"order_id":42}' /tmp/out.json
{"StatusCode": 200}

ListNodes

Terminal
$ awsemu kafka list-nodes --cluster-arn $ARN \
    --query 'NodeInfoList[].{Id:BrokerNodeInfo.BrokerId,Eps:BrokerNodeInfo.Endpoints,Type:InstanceType}'
[{
  "Id":   1,
  "Eps":  ["127.0.0.1:54321"],
  "Type": "kafka.m5.large"
}]

Teardown

Terminal
$ awsemu kafka delete-cluster --cluster-arn $ARN

# DeleteCluster stops the container (10 s grace) then force-removes it,
# then drops the moto record.
$ awsemu kafka list-clusters --query 'ClusterInfoList[].ClusterName'
[]

Configuration

VariableDefaultPurpose
MSK_DOCKER_BACKEND(unset)Set to 1 or true to boot a real apache/kafka container per CreateCluster. When unset, clusters are moto-only and GetBootstrapBrokers returns empty. Read at provider.py:53.

Integration points

ServiceHow it touches MSK
CloudFormationAWS::MSK::Cluster resource provider drives CreateCluster through this provider.
Secrets ManagerCommon pattern: store SCRAM passwords or admin credentials as secrets, fetch at deploy time. (The MSK SCRAM-secret association API is metadata-only; the broker still runs PLAINTEXT.)
CloudTrailAll MSK control-plane calls appear in the CloudTrail event store and the dashboard.

Known limitations