Kinesis Data Streams
Stream events locally with real Kinesis APIs. Create streams, put records, consume with shard iterators, manage shards and tags. Pure Python, no external processes, no Docker containers needed.
ACTIVE, puts three records via
put-record (each with a different partition
key so the producer side is exercised, and raw-bytes payloads so the AWS CLI v1 vs v2
base64 semantics are handled correctly), reads the records back by getting a
TRIM_HORIZON shard iterator and calling
get-records, scales the stream to two shards,
tags it, then runs a small Python script (src/kinesis_demo.py)
that produces three more payloads with boto3
and asserts every payload round-trips byte-identical via the consumer.
No special LocalEmu flags required: Kinesis runs in-process, no Docker container.
Source: 12-kinesis-demo/ in the examples repo.
Real Streams
Create streams, put records, consume with shard iterators. Full Kinesis Data Streams API.
Shard Management
Split and merge shards, update shard count, track sequence numbers and hash key ranges.
Pure Python
No Node.js, no external process, no Docker. Kinesis runs directly inside LocalEmu's Python runtime.
Lambda Integration
Wire up Event Source Mappings to trigger Lambda functions from Kinesis records automatically.
Step-by-Step Walkthrough
Step 1: Start LocalEmu
$ localemu start
Starting LocalEmu...
LocalEmu version: 1.0.0
Ready. No special environment variables needed. Kinesis is a pure Python service - it starts automatically with LocalEmu.
Step 2: Create a Kinesis stream
$ awsemu kinesis create-stream --stream-name my-stream --shard-count 1 Creates a stream with 1 shard. The command returns immediately with no output (HTTP 200).
Step 3: Describe the stream
$ awsemu kinesis describe-stream --stream-name my-stream
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211456"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "0"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
"StreamName": "my-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1775864300.265016
}
} Stream is ACTIVE immediately. One shard with a full hash key range, 24-hour retention, no encryption. Ready for records.
Step 4: Put 3 records (login, purchase, logout)
$ awsemu kinesis put-record --stream-name my-stream \
--partition-key user-1 \
--data "$(echo -n '{"event":"login","user":"tarek"}' | base64)"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "1"
}
$ awsemu kinesis put-record --stream-name my-stream \
--partition-key user-2 \
--data "$(echo -n '{"event":"purchase","user":"alice","amount":99.99}' | base64)"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "2"
}
$ awsemu kinesis put-record --stream-name my-stream \
--partition-key user-1 \
--data "$(echo -n '{"event":"logout","user":"tarek"}' | base64)"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "3"
} Each record gets a monotonically increasing sequence number. Kinesis data is opaque bytes on the wire; the AWS CLI base64-encodes the bytes for transport, which is why we pipe the JSON through base64 here. Partition keys (user-1, user-2) hash into the shard key range, so all records for the same key land on the same shard.
Step 5: Get shard iterator and read all records
$ SHARD_ID=$(awsemu kinesis describe-stream --stream-name my-stream \
--query 'StreamDescription.Shards[0].ShardId' --output text)
$ echo "Shard: $SHARD_ID"
Shard: shardId-000000000000
$ ITERATOR=$(awsemu kinesis get-shard-iterator --stream-name my-stream \
--shard-id $SHARD_ID --shard-iterator-type TRIM_HORIZON \
--query 'ShardIterator' --output text)
$ awsemu kinesis get-records --shard-iterator $ITERATOR
{
"Records": [
{
"SequenceNumber": "1",
"ApproximateArrivalTimestamp": 1775857151.383648,
"Data": "ZXlKbGRtVnVkQ0k2SW14dloybHVJaXdpZFhObGNpSTZJblJoY21WckluMD0=",
"PartitionKey": "user-1"
},
{
"SequenceNumber": "2",
"ApproximateArrivalTimestamp": 1775857167.835388,
"Data": "ZXlKbGRtVnVkQ0k2SW5CMWNtTm9ZWE5sSWl3aWRYTmxjaUk2SW1Gc2FXTmxJaXdpWVcxdmRXNTBJam81T1M0NU9YMD0=",
"PartitionKey": "user-2"
},
{
"SequenceNumber": "3",
"ApproximateArrivalTimestamp": 1775857180.685132,
"Data": "ZXlKbGRtVnVkQ0k2SW14dloyOTFkQ0lzSW5WelpYSWlPaUowWVhKbGF5Sjk=",
"PartitionKey": "user-1"
}
],
"NextShardIterator": "bXktc3RyZWFtOnNoYXJkSWQtMDAwMDAwMDAwMDAwOjM=\n",
"MillisBehindLatest": 0
} TRIM_HORIZON reads from the beginning of the shard. All 3 records returned with their partition keys, sequence numbers, and timestamps. MillisBehindLatest: 0 means the consumer is fully caught up.
Step 6: List streams and describe summary
$ awsemu kinesis list-streams
{
"StreamNames": [
"my-stream"
],
"StreamSummaries": [
{
"StreamName": "my-stream",
"StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
"StreamStatus": "ACTIVE",
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"StreamCreationTimestamp": 1775864300.265016
}
]
} $ awsemu kinesis describe-stream-summary --stream-name my-stream
{
"StreamDescriptionSummary": {
"StreamName": "my-stream",
"StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
"StreamStatus": "ACTIVE",
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": 1775864300.265016,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 1
}
} The summary shows the stream is PROVISIONED mode with 1 open shard and 24-hour retention.
Step 7: Scale up - update shard count from 1 to 2
$ awsemu kinesis update-shard-count --stream-name my-stream \
--target-shard-count 2 --scaling-type UNIFORM_SCALING
{
"StreamName": "my-stream",
"CurrentShardCount": 1,
"TargetShardCount": 2
} UNIFORM_SCALING splits shards evenly. The response confirms the change from 1 to 2 shards.
Step 8: Tag the stream
$ awsemu kinesis add-tags-to-stream --stream-name my-stream \
--tags env=dev,team=platform
$ awsemu kinesis list-tags-for-stream --stream-name my-stream
{
"Tags": [
{
"Key": "env",
"Value": "dev"
},
{
"Key": "team",
"Value": "platform"
}
],
"HasMoreTags": false
} Tags work exactly like AWS. Use them for cost allocation, environment tracking, or team ownership.
Step 9: Delete the stream and verify
$ awsemu kinesis delete-stream --stream-name my-stream
$ awsemu kinesis list-streams
{
"StreamNames": [],
"StreamSummaries": []
}
(stream deleted - clean state) Stream deleted instantly. List streams confirms empty state. Clean teardown, every time.
Server Logs
LocalEmu logs every Kinesis API call with timestamps and HTTP status codes. Here is what the server side looks like during the walkthrough above.
2026-04-10T23:38:20.265 INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.CreateStream => 200
2026-04-10T23:39:11.400 INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:39:27.835 INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:39:40.685 INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:40:29.474 INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.GetRecords => 200 Python Integration (boto3)
A complete producer and consumer in one script. The same boto3 code runs against real AWS Kinesis: unset AWS_ENDPOINT_URL and the call goes to kinesis.us-east-1.amazonaws.com instead. Credentials come from the standard boto3 chain, no hardcoded secrets in source.
import os
import json
import boto3
# Point boto3 at LocalEmu via the standard AWS_ENDPOINT_URL env var.
# Credentials come from the usual boto3 chain (env, shared config, ...).
kinesis = boto3.client(
"kinesis",
endpoint_url=os.environ.get("AWS_ENDPOINT_URL", "http://localhost:4566"),
region_name="us-east-1",
)
kinesis.create_stream(StreamName="events", ShardCount=1)
kinesis.get_waiter("stream_exists").wait(StreamName="events")
# --- Producer ---
events = [
{"event": "login", "user": "tarek"},
{"event": "purchase", "user": "alice", "amount": 99.99},
{"event": "logout", "user": "tarek"},
]
for evt in events:
kinesis.put_record(
StreamName="events",
# Data is raw bytes; boto3 handles the base64 transport encoding.
Data=json.dumps(evt).encode("utf-8"),
PartitionKey=f'user-{evt["user"]}',
)
print(f"Sent: {evt}")
# --- Consumer ---
desc = kinesis.describe_stream(StreamName="events")
shard_id = desc["StreamDescription"]["Shards"][0]["ShardId"]
iterator = kinesis.get_shard_iterator(
StreamName="events",
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]
response = kinesis.get_records(ShardIterator=iterator)
print(f"\nReceived {len(response['Records'])} records:")
for record in response["Records"]:
data = json.loads(record["Data"])
print(f" PartitionKey={record['PartitionKey']} -> {data}")
kinesis.delete_stream(StreamName="events")
print("\nStream deleted.") How It Works
Lambda Event Source Mapping
LocalEmu supports wiring Kinesis streams to Lambda functions via Event Source Mappings. When records arrive in a stream, LocalEmu polls the shards and invokes your Lambda function with batches of records - the same behavior as AWS.
Use awsemu lambda create-event-source-mapping with the stream ARN to set up the trigger. Your Lambda receives the standard Kinesis event payload with Records, partition keys, and sequence numbers.
Pure Python - no Node.js, no external process
Kinesis runs directly in LocalEmu. Unlike OpenSearch or RDS (which spin up Docker containers), Kinesis is implemented entirely in Python via Moto. This means instant startup, zero resource overhead, and no Docker dependency. Just localemu start and you are ready to stream.
Known gap: Enhanced Fan-Out child-shard info
Enhanced Fan-Out is supported (SubscribeToShard). KCL re-sharding via the ChildShards field on each event is a known gap (TODO at services/kinesis/provider.py:252). Consumers that rely on child-shard info to follow splits/merges will not see them. Plain producer + consumer flows (the walkthrough above) are unaffected.