Kinesis
LocalEmu Kinesis Data Streams implements all 39 operations. The bulk of the stream and shard machinery (CreateStream, ListShards, MergeShards, SplitShard, GetShardIterator, GetRecords, sequence numbers, consumer registration, encryption settings) is served by the moto-ext metadata backend, which holds streams, shards, and records in-process. On top of that, six operations have LocalEmu-custom code: PutRecord and PutRecords gate on KINESIS_ERROR_PROBABILITY for fault injection; SubscribeToShard is a real Python event-stream generator that drives Enhanced Fan-Out consumers; and the resource-policy trio (Put, Get, Delete) is fully LocalEmu-side.
Operation-level coverage: see the Kinesis coverage matrix.
Quick start
$ awsemu kinesis create-stream --stream-name events --shard-count 2
$ awsemu kinesis describe-stream --stream-name events \
--query 'StreamDescription.{Status:StreamStatus,Shards:Shards[].ShardId}' --output json
{
"Status": "ACTIVE",
"Shards": ["shardId-000000000000", "shardId-000000000001"]
}
$ awsemu kinesis put-record --stream-name events \
--partition-key user-42 \
--data "$(printf '{\"event\":\"login\"}' | base64)" \
--query '{ShardId:ShardId,SequenceNumber:SequenceNumber}'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49640..."
}
# Read it back via a shard iterator
$ IT=$(awsemu kinesis get-shard-iterator --stream-name events \
--shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON \
--query ShardIterator --output text)
$ awsemu kinesis get-records --shard-iterator $IT \
--query 'Records[].{Seq:SequenceNumber,Data:Data,PK:PartitionKey}' --output json
[{
"Seq": "49640...",
"Data": "eyJldmVudCI6ImxvZ2luIn0=",
"PK": "user-42"
}] Records are routed to shards by hashing the PartitionKey (or by the explicit ExplicitHashKey when supplied). Sequence numbers are monotonic per shard. Both behaviours are inherited from the moto-ext backend; LocalEmu does not override them.
Architecture
Code lives at services/kinesis/. The provider class KinesisProvider at provider.py:102 uses the LocalEmu forwarding-fallback pattern:
- •6 custom methods handle their request inline.
- •33 moto-delegated methods raise
NotImplementedError; theForwardingFallbackDispatcher(aws/forwarder.py:106-123) catches that and routes the call to moto's Kinesis backend. - •Custom state lives in
KinesisStoreatmodels.py:18-28: resource policies per stream, registered consumers list, enhanced-metrics flags. Accessed via the globalkinesis_storesAccountRegionBundle (models.py:30).
Concurrency and thread safety:
- •Fault injection draws from
SystemRandom()(provider.py:50-54) rather than the stdlib singleton, so concurrentPutRecordcalls under high load see consistent error rates. - •The consumer-ARN to stream-name resolver is cached in a bounded dict (4096 entries, FIFO eviction) guarded by a
threading.Lockatprovider.py:68-94. Without it, everySubscribeToShardcall would scan all streams across all accounts/regions.
Persistence: kinesis_stores and the moto backend are both included in the standard state save/load path. With PERSISTENCE=1, streams, shards, records, sequence numbers, consumer registrations, and resource policies all survive restart. In-flight SubscribeToShard generators do not survive (the consumer is expected to reconnect after the 300-second subscription window expires anyway).
Features supported
| Feature | Notes |
|---|---|
| Stream lifecycle | CreateStream, DeleteStream, DescribeStream, DescribeStreamSummary, ListStreams, UpdateStreamMode, UpdateShardCount. |
| Shard operations | ListShards, MergeShards, SplitShard, GetShardIterator (TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER). |
| Records | PutRecord, PutRecords, GetRecords. Records preserve insertion order within a shard. |
| Fault injection | KINESIS_ERROR_PROBABILITY (0.0-1.0) makes PutRecord raise ProvisionedThroughputExceededException and PutRecords return all-failed with the same error code, useful for validating producer retry logic. |
| Enhanced fan-out consumers | RegisterStreamConsumer, DeregisterStreamConsumer, DescribeStreamConsumer, ListStreamConsumers, SubscribeToShard. |
| Encryption | StartStreamEncryption, StopStreamEncryption. KMS key ID stored and returned; see Known limitations for the at-rest nuance. |
| Retention | IncreaseStreamRetentionPeriod, DecreaseStreamRetentionPeriod. Settings persisted; see Known limitations on eviction. |
| Tags | AddTagsToStream, RemoveTagsFromStream, ListTagsForStream, ListTagsForResource, TagResource, UntagResource. |
| Resource policies | PutResourcePolicy, GetResourcePolicy, DeleteResourcePolicy. Full LocalEmu-side impl with ARN regex validation at provider.py:56-61. Stored in KinesisStore.resource_policies. |
| Account settings | DescribeAccountSettings, UpdateAccountSettings, DescribeLimits, UpdateMaxRecordSize, UpdateStreamWarmThroughput. |
| Monitoring | EnableEnhancedMonitoring, DisableEnhancedMonitoring. Shard-level metric flags stored on the stream. |
| Persistence | Full state restored across PERSISTENCE=1 restart: streams, shards, records, sequence numbers, consumers, policies, tags, encryption settings. |
Batched writes and fault injection
PutRecords accepts up to 500 records per call. Each record is routed independently by the moto backend based on its PartitionKey. The FailedRecordCount response field tells you how many records hit a per-shard limit. Setting KINESIS_ERROR_PROBABILITY lets you reproduce the failure paths your producer has to handle in production.
$ awsemu kinesis put-records --stream-name events --records '[
{"PartitionKey":"user-1","Data":"YQ=="},
{"PartitionKey":"user-2","Data":"Yg=="},
{"PartitionKey":"user-3","Data":"Yw=="}
]' --query '{Failed:FailedRecordCount,Per:Records[].ShardId}'
{
"Failed": 0,
"Per": ["shardId-000000000001","shardId-000000000000","shardId-000000000001"]
}
# Simulate a flaky stream to validate retry logic in your producer
$ KINESIS_ERROR_PROBABILITY=0.5 localemu restart
$ awsemu kinesis put-records --stream-name events --records '[
{"PartitionKey":"user-1","Data":"YQ=="},
{"PartitionKey":"user-2","Data":"Yg=="}
]' --query FailedRecordCount
2 Enhanced fan-out with SubscribeToShard
SubscribeToShard (provider.py:184-256) yields a real event stream of SubscribeToShardEvent objects for up to MAX_SUBSCRIPTION_SECONDS=300 seconds (provider.py:44). Internally it polls the shard via GetRecords on a ~3-second cadence, batches the records into events, and stamps each event with ContinuationSequenceNumber. Idle waits use a threading.Event, so shutting the gateway down does not hang on in-flight subscriptions.
$ STREAM_ARN=$(awsemu kinesis describe-stream --stream-name events \
--query StreamDescription.StreamARN --output text)
$ awsemu kinesis register-stream-consumer \
--stream-arn $STREAM_ARN --consumer-name efo-consumer \
--query Consumer.ConsumerARN --output text
arn:aws:kinesis:us-east-1:000000000000:stream/events/consumer/efo-consumer:1747...
$ awsemu kinesis subscribe-to-shard \
--consumer-arn arn:aws:kinesis:...:consumer/efo-consumer:1747... \
--shard-id shardId-000000000000 \
--starting-position '{"Type":"LATEST"}'
{
"EventStream": [
{"SubscribeToShardEvent": {"Records":[...], "ContinuationSequenceNumber":"49640...", "MillisBehindLatest":0}}
]
} Lambda Event Source Mapping
Wire a Kinesis stream as a Lambda event source the way you would in AWS. The KinesisPoller at services/lambda_/event_source_mapping/pollers/kinesis_poller.py:22 picks up the registered ESM, calls GetShardIterator + GetRecords against this Kinesis backend, and invokes the function with the standard Records[].kinesis envelope.
$ awsemu lambda create-event-source-mapping \
--event-source-arn $STREAM_ARN \
--function-name process-events \
--starting-position TRIM_HORIZON --batch-size 100 \
--query '{State:State,UUID:UUID}'
{
"State": "Enabled",
"UUID": "11111111-2222-3333-4444-555555555555"
}
$ awsemu kinesis put-record --stream-name events \
--partition-key u1 --data "$(printf 'hello' | base64)" >/dev/null
# The KinesisPoller picks up the new record and invokes the Lambda with the
# standard kinesis-event envelope: {"Records":[{"kinesis":{"data":...}}]}
$ awsemu logs filter-log-events \
--log-group-name /aws/lambda/process-events --limit 1 --query 'events[0].message'
"Got event: {\"Records\":[{\"kinesis\":{\"data\":\"aGVsbG8=\",\"partitionKey\":\"u1\",...}}]}" Resource policies
Stream-level resource policies are stored and returned by PutResourcePolicy / GetResourcePolicy. The stream ARN is validated against ^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[A-Za-z0-9_.\-]+$ (provider.py:56-57). DeleteResourcePolicy re-verifies stream existence before removing the policy so a missing-stream call returns a deterministic ResourceNotFoundException.
$ awsemu kinesis put-resource-policy \
--resource-arn $STREAM_ARN \
--policy '{
"Version": "2012-10-17",
"Statement": [{
"Sid": "AllowAppRoleRead",
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::000000000000:role/AppRole"},
"Action": ["kinesis:GetRecords","kinesis:DescribeStream"],
"Resource": "*"
}]
}'
$ awsemu kinesis get-resource-policy --resource-arn $STREAM_ARN \
--query 'Policy' --output text | python3 -m json.tool
{"Version": "2012-10-17", "Statement": [...]} Configuration
| Variable | Default | Purpose |
|---|---|---|
KINESIS_ERROR_PROBABILITY | 0.0 | Probability in [0.0, 1.0] that PutRecord raises ProvisionedThroughputExceededException; PutRecords returns the same error for every record in the batch. Defined at config.py:902. |
Three legacy variables (KINESIS_INITIALIZE_STREAMS, KINESIS_PROVIDER, SYNCHRONOUS_KINESIS_EVENTS) are recognised but deprecated since the LocalStack 1.3/1.4 era; they have no effect.
Integration points
Other LocalEmu services produce or consume from Kinesis.
| Service | How it touches Kinesis |
|---|---|
| Lambda Event Source Mapping | services/lambda_/event_source_mapping/pollers/kinesis_poller.py: polls GetRecords with the configured starting position and invokes the function with the AWS-standard kinesis event envelope. |
| EventBridge Pipes | Same poller, different envelope (no nested kinesis namespace, matching the Pipes contract). |
| CloudFormation | AWS::Kinesis::Stream resource provider at services/kinesis/resource_providers/aws_kinesis_stream.py. |
Known limitations
- •
MillisBehindLatestis hardcoded to 0 in everySubscribeToShardevent (provider.py:251). Consumers cannot detect lag against the head of the shard. - •
ChildShardsis not populated inSubscribeToShardevents (provider.py:252). Consumers using the Kinesis Client Library will not auto-reshard via the EFO path; classicGetRecordswithListShardsworks. - •Stream retention is metadata only.
IncreaseStreamRetentionPeriodandDecreaseStreamRetentionPeriodsave the value;DescribeStreamreports it; but no background worker evicts records that have aged past the window. Records live until process exit (or are restored on the next start withPERSISTENCE=1). - •Stream encryption is metadata only.
StartStreamEncryptionstores theKeyIdandEncryptionType=KMS;DescribeStreamreturns them; but record payloads are not actually encrypted under the referenced KMS key. - •
SubscribeToSharduses an emulated event stream, not real HTTP/2. The boto3 consumer sees the sameEventStreamcontract, but the wire framing is the LocalEmu generator output and the polling cadence is ~3 s rather than push-driven. - •Resource policies are stored but not enforced.
PutResourcePolicysaves the JSON;GetResourcePolicyreturns it; butGetRecordsdoes not deny based on the policy principal. Use IAM policies for actual access control.