Docs / Kinesis

Kinesis

LocalEmu Kinesis Data Streams implements all 39 operations. The bulk of the stream and shard machinery (CreateStream, ListShards, MergeShards, SplitShard, GetShardIterator, GetRecords, sequence numbers, consumer registration, encryption settings) is served by the moto-ext metadata backend, which holds streams, shards, and records in-process. On top of that, six operations have LocalEmu-custom code: PutRecord and PutRecords gate on KINESIS_ERROR_PROBABILITY for fault injection; SubscribeToShard is a real Python event-stream generator that drives Enhanced Fan-Out consumers; and the resource-policy trio (Put, Get, Delete) is fully LocalEmu-side.

Operation-level coverage: see the Kinesis coverage matrix.

Quick start

Terminal
$ awsemu kinesis create-stream --stream-name events --shard-count 2
$ awsemu kinesis describe-stream --stream-name events \
    --query 'StreamDescription.{Status:StreamStatus,Shards:Shards[].ShardId}' --output json
{
  "Status": "ACTIVE",
  "Shards": ["shardId-000000000000", "shardId-000000000001"]
}

$ awsemu kinesis put-record --stream-name events \
    --partition-key user-42 \
    --data "$(printf '{\"event\":\"login\"}' | base64)" \
    --query '{ShardId:ShardId,SequenceNumber:SequenceNumber}'
{
  "ShardId": "shardId-000000000000",
  "SequenceNumber": "49640..."
}

# Read it back via a shard iterator
$ IT=$(awsemu kinesis get-shard-iterator --stream-name events \
    --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON \
    --query ShardIterator --output text)

$ awsemu kinesis get-records --shard-iterator $IT \
    --query 'Records[].{Seq:SequenceNumber,Data:Data,PK:PartitionKey}' --output json
[{
  "Seq":  "49640...",
  "Data": "eyJldmVudCI6ImxvZ2luIn0=",
  "PK":   "user-42"
}]

Records are routed to shards by hashing the PartitionKey (or by the explicit ExplicitHashKey when supplied). Sequence numbers are monotonic per shard. Both behaviours are inherited from the moto-ext backend; LocalEmu does not override them.

Architecture

Code lives at services/kinesis/. The provider class KinesisProvider at provider.py:102 uses the LocalEmu forwarding-fallback pattern:

Concurrency and thread safety:

Persistence: kinesis_stores and the moto backend are both included in the standard state save/load path. With PERSISTENCE=1, streams, shards, records, sequence numbers, consumer registrations, and resource policies all survive restart. In-flight SubscribeToShard generators do not survive (the consumer is expected to reconnect after the 300-second subscription window expires anyway).

Features supported

FeatureNotes
Stream lifecycleCreateStream, DeleteStream, DescribeStream, DescribeStreamSummary, ListStreams, UpdateStreamMode, UpdateShardCount.
Shard operationsListShards, MergeShards, SplitShard, GetShardIterator (TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER).
RecordsPutRecord, PutRecords, GetRecords. Records preserve insertion order within a shard.
Fault injectionKINESIS_ERROR_PROBABILITY (0.0-1.0) makes PutRecord raise ProvisionedThroughputExceededException and PutRecords return all-failed with the same error code, useful for validating producer retry logic.
Enhanced fan-out consumersRegisterStreamConsumer, DeregisterStreamConsumer, DescribeStreamConsumer, ListStreamConsumers, SubscribeToShard.
EncryptionStartStreamEncryption, StopStreamEncryption. KMS key ID stored and returned; see Known limitations for the at-rest nuance.
RetentionIncreaseStreamRetentionPeriod, DecreaseStreamRetentionPeriod. Settings persisted; see Known limitations on eviction.
TagsAddTagsToStream, RemoveTagsFromStream, ListTagsForStream, ListTagsForResource, TagResource, UntagResource.
Resource policiesPutResourcePolicy, GetResourcePolicy, DeleteResourcePolicy. Full LocalEmu-side impl with ARN regex validation at provider.py:56-61. Stored in KinesisStore.resource_policies.
Account settingsDescribeAccountSettings, UpdateAccountSettings, DescribeLimits, UpdateMaxRecordSize, UpdateStreamWarmThroughput.
MonitoringEnableEnhancedMonitoring, DisableEnhancedMonitoring. Shard-level metric flags stored on the stream.
PersistenceFull state restored across PERSISTENCE=1 restart: streams, shards, records, sequence numbers, consumers, policies, tags, encryption settings.

Batched writes and fault injection

PutRecords accepts up to 500 records per call. Each record is routed independently by the moto backend based on its PartitionKey. The FailedRecordCount response field tells you how many records hit a per-shard limit. Setting KINESIS_ERROR_PROBABILITY lets you reproduce the failure paths your producer has to handle in production.

Terminal
$ awsemu kinesis put-records --stream-name events --records '[
      {"PartitionKey":"user-1","Data":"YQ=="},
      {"PartitionKey":"user-2","Data":"Yg=="},
      {"PartitionKey":"user-3","Data":"Yw=="}
  ]' --query '{Failed:FailedRecordCount,Per:Records[].ShardId}'
{
  "Failed": 0,
  "Per":    ["shardId-000000000001","shardId-000000000000","shardId-000000000001"]
}

# Simulate a flaky stream to validate retry logic in your producer
$ KINESIS_ERROR_PROBABILITY=0.5 localemu restart
$ awsemu kinesis put-records --stream-name events --records '[
      {"PartitionKey":"user-1","Data":"YQ=="},
      {"PartitionKey":"user-2","Data":"Yg=="}
  ]' --query FailedRecordCount
2

Enhanced fan-out with SubscribeToShard

SubscribeToShard (provider.py:184-256) yields a real event stream of SubscribeToShardEvent objects for up to MAX_SUBSCRIPTION_SECONDS=300 seconds (provider.py:44). Internally it polls the shard via GetRecords on a ~3-second cadence, batches the records into events, and stamps each event with ContinuationSequenceNumber. Idle waits use a threading.Event, so shutting the gateway down does not hang on in-flight subscriptions.

Terminal
$ STREAM_ARN=$(awsemu kinesis describe-stream --stream-name events \
    --query StreamDescription.StreamARN --output text)

$ awsemu kinesis register-stream-consumer \
    --stream-arn $STREAM_ARN --consumer-name efo-consumer \
    --query Consumer.ConsumerARN --output text
arn:aws:kinesis:us-east-1:000000000000:stream/events/consumer/efo-consumer:1747...

$ awsemu kinesis subscribe-to-shard \
    --consumer-arn arn:aws:kinesis:...:consumer/efo-consumer:1747... \
    --shard-id shardId-000000000000 \
    --starting-position '{"Type":"LATEST"}'
{
  "EventStream": [
    {"SubscribeToShardEvent": {"Records":[...], "ContinuationSequenceNumber":"49640...", "MillisBehindLatest":0}}
  ]
}

Lambda Event Source Mapping

Wire a Kinesis stream as a Lambda event source the way you would in AWS. The KinesisPoller at services/lambda_/event_source_mapping/pollers/kinesis_poller.py:22 picks up the registered ESM, calls GetShardIterator + GetRecords against this Kinesis backend, and invokes the function with the standard Records[].kinesis envelope.

Terminal
$ awsemu lambda create-event-source-mapping \
    --event-source-arn $STREAM_ARN \
    --function-name process-events \
    --starting-position TRIM_HORIZON --batch-size 100 \
    --query '{State:State,UUID:UUID}'
{
  "State": "Enabled",
  "UUID":  "11111111-2222-3333-4444-555555555555"
}

$ awsemu kinesis put-record --stream-name events \
    --partition-key u1 --data "$(printf 'hello' | base64)" >/dev/null

# The KinesisPoller picks up the new record and invokes the Lambda with the
# standard kinesis-event envelope: {"Records":[{"kinesis":{"data":...}}]}
$ awsemu logs filter-log-events \
    --log-group-name /aws/lambda/process-events --limit 1 --query 'events[0].message'
"Got event: {\"Records\":[{\"kinesis\":{\"data\":\"aGVsbG8=\",\"partitionKey\":\"u1\",...}}]}"

Resource policies

Stream-level resource policies are stored and returned by PutResourcePolicy / GetResourcePolicy. The stream ARN is validated against ^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[A-Za-z0-9_.\-]+$ (provider.py:56-57). DeleteResourcePolicy re-verifies stream existence before removing the policy so a missing-stream call returns a deterministic ResourceNotFoundException.

Terminal
$ awsemu kinesis put-resource-policy \
    --resource-arn $STREAM_ARN \
    --policy '{
      "Version": "2012-10-17",
      "Statement": [{
        "Sid":       "AllowAppRoleRead",
        "Effect":    "Allow",
        "Principal": {"AWS": "arn:aws:iam::000000000000:role/AppRole"},
        "Action":    ["kinesis:GetRecords","kinesis:DescribeStream"],
        "Resource":  "*"
      }]
    }'

$ awsemu kinesis get-resource-policy --resource-arn $STREAM_ARN \
    --query 'Policy' --output text | python3 -m json.tool
{"Version": "2012-10-17", "Statement": [...]}

Configuration

VariableDefaultPurpose
KINESIS_ERROR_PROBABILITY0.0Probability in [0.0, 1.0] that PutRecord raises ProvisionedThroughputExceededException; PutRecords returns the same error for every record in the batch. Defined at config.py:902.

Three legacy variables (KINESIS_INITIALIZE_STREAMS, KINESIS_PROVIDER, SYNCHRONOUS_KINESIS_EVENTS) are recognised but deprecated since the LocalStack 1.3/1.4 era; they have no effect.

Integration points

Other LocalEmu services produce or consume from Kinesis.

ServiceHow it touches Kinesis
Lambda Event Source Mappingservices/lambda_/event_source_mapping/pollers/kinesis_poller.py: polls GetRecords with the configured starting position and invokes the function with the AWS-standard kinesis event envelope.
EventBridge PipesSame poller, different envelope (no nested kinesis namespace, matching the Pipes contract).
CloudFormationAWS::Kinesis::Stream resource provider at services/kinesis/resource_providers/aws_kinesis_stream.py.

Known limitations