Docs / Use Cases / Kinesis

Kinesis Data Streams

Stream events locally with real Kinesis APIs. Create streams, put records, consume with shard iterators, manage shards and tags. Pure Python, no external processes, no Docker containers needed.

What the demo does. Creates a Kinesis stream with one shard, waits for it to reach ACTIVE, puts three records via put-record (each with a different partition key so the producer side is exercised, and raw-bytes payloads so the AWS CLI v1 vs v2 base64 semantics are handled correctly), reads the records back by getting a TRIM_HORIZON shard iterator and calling get-records, scales the stream to two shards, tags it, then runs a small Python script (src/kinesis_demo.py) that produces three more payloads with boto3 and asserts every payload round-trips byte-identical via the consumer. No special LocalEmu flags required: Kinesis runs in-process, no Docker container. Source: 12-kinesis-demo/ in the examples repo.
📨

Real Streams

Create streams, put records, consume with shard iterators. Full Kinesis Data Streams API.

Shard Management

Split and merge shards, update shard count, track sequence numbers and hash key ranges.

🐍

Pure Python

No Node.js, no external process, no Docker. Kinesis runs directly inside LocalEmu's Python runtime.

Lambda Integration

Wire up Event Source Mappings to trigger Lambda functions from Kinesis records automatically.

Step-by-Step Walkthrough

Step 1: Start LocalEmu

Terminal
$ localemu start

Starting LocalEmu...

LocalEmu version: 1.0.0

Ready.

No special environment variables needed. Kinesis is a pure Python service - it starts automatically with LocalEmu.

Step 2: Create a Kinesis stream

Terminal
$ awsemu kinesis create-stream --stream-name my-stream --shard-count 1

Creates a stream with 1 shard. The command returns immediately with no output (HTTP 200).

Step 3: Describe the stream

Terminal
$ awsemu kinesis describe-stream --stream-name my-stream

{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211456"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "0"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
        "StreamName": "my-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1775864300.265016
    }
}

Stream is ACTIVE immediately. One shard with a full hash key range, 24-hour retention, no encryption. Ready for records.

Step 4: Put 3 records (login, purchase, logout)

Terminal
$ awsemu kinesis put-record --stream-name my-stream \
    --partition-key user-1 \
    --data "$(echo -n '{"event":"login","user":"tarek"}' | base64)"

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "1"
}

$ awsemu kinesis put-record --stream-name my-stream \
    --partition-key user-2 \
    --data "$(echo -n '{"event":"purchase","user":"alice","amount":99.99}' | base64)"

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "2"
}

$ awsemu kinesis put-record --stream-name my-stream \
    --partition-key user-1 \
    --data "$(echo -n '{"event":"logout","user":"tarek"}' | base64)"

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "3"
}

Each record gets a monotonically increasing sequence number. Kinesis data is opaque bytes on the wire; the AWS CLI base64-encodes the bytes for transport, which is why we pipe the JSON through base64 here. Partition keys (user-1, user-2) hash into the shard key range, so all records for the same key land on the same shard.

Step 5: Get shard iterator and read all records

Terminal
$ SHARD_ID=$(awsemu kinesis describe-stream --stream-name my-stream \
    --query 'StreamDescription.Shards[0].ShardId' --output text)
$ echo "Shard: $SHARD_ID"
Shard: shardId-000000000000

$ ITERATOR=$(awsemu kinesis get-shard-iterator --stream-name my-stream \
    --shard-id $SHARD_ID --shard-iterator-type TRIM_HORIZON \
    --query 'ShardIterator' --output text)

$ awsemu kinesis get-records --shard-iterator $ITERATOR

{
    "Records": [
        {
            "SequenceNumber": "1",
            "ApproximateArrivalTimestamp": 1775857151.383648,
            "Data": "ZXlKbGRtVnVkQ0k2SW14dloybHVJaXdpZFhObGNpSTZJblJoY21WckluMD0=",
            "PartitionKey": "user-1"
        },
        {
            "SequenceNumber": "2",
            "ApproximateArrivalTimestamp": 1775857167.835388,
            "Data": "ZXlKbGRtVnVkQ0k2SW5CMWNtTm9ZWE5sSWl3aWRYTmxjaUk2SW1Gc2FXTmxJaXdpWVcxdmRXNTBJam81T1M0NU9YMD0=",
            "PartitionKey": "user-2"
        },
        {
            "SequenceNumber": "3",
            "ApproximateArrivalTimestamp": 1775857180.685132,
            "Data": "ZXlKbGRtVnVkQ0k2SW14dloyOTFkQ0lzSW5WelpYSWlPaUowWVhKbGF5Sjk=",
            "PartitionKey": "user-1"
        }
    ],
    "NextShardIterator": "bXktc3RyZWFtOnNoYXJkSWQtMDAwMDAwMDAwMDAwOjM=\n",
    "MillisBehindLatest": 0
}

TRIM_HORIZON reads from the beginning of the shard. All 3 records returned with their partition keys, sequence numbers, and timestamps. MillisBehindLatest: 0 means the consumer is fully caught up.

Step 6: List streams and describe summary

Terminal
$ awsemu kinesis list-streams

{
    "StreamNames": [
        "my-stream"
    ],
    "StreamSummaries": [
        {
            "StreamName": "my-stream",
            "StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
            "StreamStatus": "ACTIVE",
            "StreamModeDetails": {
                "StreamMode": "PROVISIONED"
            },
            "StreamCreationTimestamp": 1775864300.265016
        }
    ]
}
Terminal
$ awsemu kinesis describe-stream-summary --stream-name my-stream

{
    "StreamDescriptionSummary": {
        "StreamName": "my-stream",
        "StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/my-stream",
        "StreamStatus": "ACTIVE",
        "StreamModeDetails": {
            "StreamMode": "PROVISIONED"
        },
        "RetentionPeriodHours": 24,
        "StreamCreationTimestamp": 1775864300.265016,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 1
    }
}

The summary shows the stream is PROVISIONED mode with 1 open shard and 24-hour retention.

Step 7: Scale up - update shard count from 1 to 2

Terminal
$ awsemu kinesis update-shard-count --stream-name my-stream \
    --target-shard-count 2 --scaling-type UNIFORM_SCALING

{
    "StreamName": "my-stream",
    "CurrentShardCount": 1,
    "TargetShardCount": 2
}

UNIFORM_SCALING splits shards evenly. The response confirms the change from 1 to 2 shards.

Step 8: Tag the stream

Terminal
$ awsemu kinesis add-tags-to-stream --stream-name my-stream \
    --tags env=dev,team=platform

$ awsemu kinesis list-tags-for-stream --stream-name my-stream

{
    "Tags": [
        {
            "Key": "env",
            "Value": "dev"
        },
        {
            "Key": "team",
            "Value": "platform"
        }
    ],
    "HasMoreTags": false
}

Tags work exactly like AWS. Use them for cost allocation, environment tracking, or team ownership.

Step 9: Delete the stream and verify

Terminal
$ awsemu kinesis delete-stream --stream-name my-stream

$ awsemu kinesis list-streams

{
    "StreamNames": [],
    "StreamSummaries": []
}

(stream deleted - clean state)

Stream deleted instantly. List streams confirms empty state. Clean teardown, every time.

Server Logs

LocalEmu logs every Kinesis API call with timestamps and HTTP status codes. Here is what the server side looks like during the walkthrough above.

LocalEmu Server Output
2026-04-10T23:38:20.265  INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.CreateStream => 200
2026-04-10T23:39:11.400  INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:39:27.835  INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:39:40.685  INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.PutRecord => 200
2026-04-10T23:40:29.474  INFO --- [et.reactor-0] localemu.request.aws : AWS kinesis.GetRecords => 200

Python Integration (boto3)

A complete producer and consumer in one script. The same boto3 code runs against real AWS Kinesis: unset AWS_ENDPOINT_URL and the call goes to kinesis.us-east-1.amazonaws.com instead. Credentials come from the standard boto3 chain, no hardcoded secrets in source.

kinesis_demo.py
import os
import json
import boto3

# Point boto3 at LocalEmu via the standard AWS_ENDPOINT_URL env var.
# Credentials come from the usual boto3 chain (env, shared config, ...).
kinesis = boto3.client(
    "kinesis",
    endpoint_url=os.environ.get("AWS_ENDPOINT_URL", "http://localhost:4566"),
    region_name="us-east-1",
)

kinesis.create_stream(StreamName="events", ShardCount=1)
kinesis.get_waiter("stream_exists").wait(StreamName="events")

# --- Producer ---
events = [
    {"event": "login", "user": "tarek"},
    {"event": "purchase", "user": "alice", "amount": 99.99},
    {"event": "logout", "user": "tarek"},
]
for evt in events:
    kinesis.put_record(
        StreamName="events",
        # Data is raw bytes; boto3 handles the base64 transport encoding.
        Data=json.dumps(evt).encode("utf-8"),
        PartitionKey=f'user-{evt["user"]}',
    )
    print(f"Sent: {evt}")

# --- Consumer ---
desc = kinesis.describe_stream(StreamName="events")
shard_id = desc["StreamDescription"]["Shards"][0]["ShardId"]

iterator = kinesis.get_shard_iterator(
    StreamName="events",
    ShardId=shard_id,
    ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]

response = kinesis.get_records(ShardIterator=iterator)
print(f"\nReceived {len(response['Records'])} records:")
for record in response["Records"]:
    data = json.loads(record["Data"])
    print(f"  PartitionKey={record['PartitionKey']} -> {data}")

kinesis.delete_stream(StreamName="events")
print("\nStream deleted.")

How It Works

1. awsemu kinesis create-stream sends a CreateStream request to LocalEmu
2. LocalEmu routes the request to Moto's Kinesis backend, which implements the full Kinesis data model in pure Python
3. Shards, hash key ranges, and sequence numbers are all managed in-memory - no external process needed
4. PutRecord stores data with partition keys and assigns monotonically increasing sequence numbers
5. GetRecords returns records in order, with shard iterators tracking consumer position
6. Lambda Event Source Mappings can poll Kinesis streams and invoke functions automatically, just like AWS

Lambda Event Source Mapping

LocalEmu supports wiring Kinesis streams to Lambda functions via Event Source Mappings. When records arrive in a stream, LocalEmu polls the shards and invokes your Lambda function with batches of records - the same behavior as AWS.

Use awsemu lambda create-event-source-mapping with the stream ARN to set up the trigger. Your Lambda receives the standard Kinesis event payload with Records, partition keys, and sequence numbers.

Pure Python - no Node.js, no external process

Kinesis runs directly in LocalEmu. Unlike OpenSearch or RDS (which spin up Docker containers), Kinesis is implemented entirely in Python via Moto. This means instant startup, zero resource overhead, and no Docker dependency. Just localemu start and you are ready to stream.

Known gap: Enhanced Fan-Out child-shard info

Enhanced Fan-Out is supported (SubscribeToShard). KCL re-sharding via the ChildShards field on each event is a known gap (TODO at services/kinesis/provider.py:252). Consumers that rely on child-shard info to follow splits/merges will not see them. Plain producer + consumer flows (the walkthrough above) are unaffected.