Docs / Kinesis Firehose

Kinesis Firehose

LocalEmu Kinesis Firehose is fully LocalEmu-side, with no moto delegation. Ten of twelve operations are custom-implemented; the two not-yet-implemented ops are the server-side-encryption knobs. The interesting part is the delivery pipeline: PutRecord and PutRecordBatch dispatch records to real destinations: an S3 object is written with the AWS-standard key layout, an HTTP collector receives a real POST with the standard envelope, an Amazon OpenSearch domain receives real index calls via opensearchpy, and a Redshift cluster receives real INSERT statements via the Redshift Data API. Configured Lambda processors really run between ingest and delivery.

Operation-level coverage: see the Firehose coverage matrix.

Quick start: S3 destination

Terminal
$ awsemu s3 mb s3://lake

$ awsemu firehose create-delivery-stream \
    --delivery-stream-name events-to-s3 \
    --s3-destination-configuration '{
      "RoleARN":   "arn:aws:iam::000000000000:role/FirehoseRole",
      "BucketARN": "arn:aws:s3:::lake",
      "Prefix":    "events/",
      "FileExtension": ".jsonl"
    }' --query DeliveryStreamARN --output text
arn:aws:firehose:us-east-1:000000000000:deliverystream/events-to-s3

# Wait for the background activator to flip the stream to ACTIVE.
$ while [ "$(awsemu firehose describe-delivery-stream \
    --delivery-stream-name events-to-s3 \
    --query 'DeliveryStreamDescription.DeliveryStreamStatus' --output text)" != "ACTIVE" ]; do sleep 1; done

$ awsemu firehose put-record --delivery-stream-name events-to-s3 \
    --record 'Data="'$(printf '{"event":"login"}' | base64)'"' --query RecordId
"f8b3..."

# A real Parquet/JSON object was written. The S3 key matches the AWS pattern:
# <Prefix>/YYYY/MM/DD/HH/<stream>-<timestamp>-<uuid><FileExtension>
$ awsemu s3 ls s3://lake/events/ --recursive
2026-05-20 19:42:11   18 events/2026/05/20/19/events-to-s3-2026-05-20-19-42-11-1ab2....jsonl

The stream lifecycle is asynchronous: CreateDeliveryStream returns immediately with DeliveryStreamStatus=CREATING; a background activator flips it to ACTIVE after a short delay. Records sent during CREATING are rejected; poll DescribeDeliveryStream before sending in production code.

Terminal
$ awsemu firehose put-record-batch \
    --delivery-stream-name events-to-s3 \
    --records '[
      {"Data":"'$(printf '{"e":1}' | base64)'"},
      {"Data":"'$(printf '{"e":2}' | base64)'"},
      {"Data":"'$(printf '{"e":3}' | base64)'"}
    ]' --query FailedPutCount
0

# Records in the batch are concatenated (base64-decoded) into a single object.
$ awsemu s3 ls s3://lake/events/ --recursive | tail -1
2026-05-20 19:42:18   24 events/2026/05/20/19/events-to-s3-2026-05-20-19-42-18-3cd4....jsonl

Architecture

Code lives at services/firehose/. The provider is pure-LocalEmu (1039 lines in provider.py), with no moto fallback. A custom FirehoseStore AccountRegionBundle holds delivery-stream records; accept_state_visitor registers it so streams survive PERSISTENCE=1 restart.

FileRole
provider.pyCRUD handlers, put-record dispatch, destination-specific deliverers, OpenSearch endpoint resolver.
mappers.pyConfig-to-description converters per destination type (S3, ExtendedS3, ES, OpenSearch, Redshift, HTTP).
models.pyFirehoseStore + global firehose_stores AccountRegionBundle.

Delivery dispatch (_process_records)

When a record arrives, the dispatcher walks each destination type on the stream description and delivers in this order:

  1. Run configured ProcessingConfiguration.Processors[] in order. Lambda processors are invoked synchronously and their returned records replace the input batch.
  2. If ElasticsearchDestinationDescription or AmazonopensearchserviceDestinationDescription: index each record via opensearchpy.OpenSearch against the configured domain.
  3. If S3DestinationDescription or ExtendedS3DestinationDescription: base64-decode and concatenate the records, then s3.put_object to the configured bucket using the key pattern below.
  4. If HttpEndpointDestinationDescription: build an envelope {requestId, timestamp, records:[{data}...]} and requests.post(url, json=..., timeout=60).
  5. If RedshiftDestinationDescription: write the batch to S3 first (via the nested S3 destination), then issue real redshift-data.execute_statement('INSERT INTO <DataTableName> VALUES (...)', Parameters=...) against the cluster identified in the JDBC URL. One INSERT per row (the Redshift Data API single-row constraint).

S3 object key layout

<Prefix>/YYYY/MM/DD/HH/<DeliveryStreamName>-YYYY-MM-DD-HH-MM-SS-<uuid><FileExtension>

This matches the AWS documented key pattern exactly, including the FileExtension suffix when configured. Hive-partitioned consumers (Athena, EMR, Glue crawlers) pointing at the prefix work out of the box.

HTTP endpoint destination

The HTTP collector receives a real POST <Url> with Content-Type: application/json and the AWS-standard payload envelope. The 60 second client-side timeout matches the AWS-configured default.

Terminal
$ awsemu firehose create-delivery-stream \
    --delivery-stream-name events-to-http \
    --http-endpoint-destination-configuration '{
      "EndpointConfiguration": {
        "Url":  "http://host.docker.internal:8080/firehose",
        "Name": "local-collector"
      },
      "RoleARN":         "arn:aws:iam::000000000000:role/FirehoseRole",
      "S3Configuration": {
        "RoleARN":   "arn:aws:iam::000000000000:role/FirehoseRole",
        "BucketARN": "arn:aws:s3:::lake"
      }
    }'

$ awsemu firehose put-record --delivery-stream-name events-to-http \
    --record 'Data="'$(printf '{"hello":"world"}' | base64)'"' >/dev/null

# The collector receives a POST with the AWS-standard envelope:
# { "requestId":..., "timestamp":..., "records":[{"data":"<base64>"},...] }

OpenSearch destination

Firehose builds an opensearchpy.OpenSearch client against the domain ARN's resolved endpoint (LocalEmu OpenSearch domain endpoint, derived from opensearch.describe-domain.Endpoint) and indexes each record into IndexName. Connection objects are cached per (account, region, endpoint) under a lock.

Terminal
$ awsemu opensearch create-domain --domain-name logs --query DomainStatus.ARN --output text
arn:aws:es:us-east-1:000000000000:domain/logs

$ awsemu firehose create-delivery-stream \
    --delivery-stream-name events-to-os \
    --amazonopensearchservice-destination-configuration '{
      "RoleARN":     "arn:aws:iam::000000000000:role/FirehoseRole",
      "DomainARN":   "arn:aws:es:us-east-1:000000000000:domain/logs",
      "IndexName":   "events",
      "S3BackupMode":"FailedDocumentsOnly",
      "S3Configuration": {
        "RoleARN":   "arn:aws:iam::000000000000:role/FirehoseRole",
        "BucketARN": "arn:aws:s3:::lake"
      }
    }'

$ awsemu firehose put-record --delivery-stream-name events-to-os \
    --record 'Data="'$(printf '{"level":"INFO","msg":"hi"}' | base64)'"' >/dev/null

$ curl -s http://localhost:4566/logs/_search?pretty | python3 -c \
    'import sys,json;d=json.load(sys.stdin);print(d["hits"]["total"]["value"])'
1

This pairs with the LocalEmu OpenSearch service, which runs a real OpenSearch process. See the OpenSearch reference for how the domain is provisioned.

Redshift destination

Records flow through S3 first (the S3Configuration sub-destination, mandatory), then a real redshift-data.execute_statement('INSERT INTO <DataTableName> VALUES (...)', Parameters=<row>) is issued against the cluster the JDBC URL points at. The cluster identifier is parsed from the URL host, the region from the URL itself, and the account from the RoleARN. To exercise this destination end-to-end, you need a real Redshift cluster running, which means REDSHIFT_DOCKER_BACKEND=1 (see the RDS / Redshift Docker reference).

Lambda transform processors

ProcessingConfiguration.Enabled=true with a Lambda Processor makes Firehose invoke the configured Lambda before delivery. The Lambda receives the records in the AWS-standard Firehose-transform shape and must return records with result: "Ok" or "Dropped" or "ProcessingFailed". Returned data replaces the input batch before the destination dispatch above.

Terminal
$ awsemu lambda create-function --function-name upper-case \
    --runtime python3.12 --handler index.handler \
    --role arn:aws:iam::000000000000:role/lambda-role \
    --zip-file fileb://upper-case.zip >/dev/null

$ awsemu firehose update-destination --delivery-stream-name events-to-s3 \
    --current-delivery-stream-version-id 1 \
    --destination-id destinationId-000000000001 \
    --extended-s3-destination-update '{
      "ProcessingConfiguration": {
        "Enabled": true,
        "Processors": [{
          "Type":"Lambda",
          "Parameters":[
            {"ParameterName":"LambdaArn","ParameterValue":"arn:aws:lambda:us-east-1:000000000000:function:upper-case"}
          ]
        }]
      }
    }'

# The Lambda is really invoked between PutRecord and S3 write; the function's
# returned records (base64) replace the input ones before delivery.

Features supported

FeatureNotes
Stream lifecycleCreateDeliveryStream (background activator flips to ACTIVE), DescribeDeliveryStream, ListDeliveryStreams, DeleteDeliveryStream, UpdateDestination.
IngestPutRecord, PutRecordBatch. Records are base64-encoded payloads up to 1 MB each.
Destinations with real dispatchS3, ExtendedS3, HTTP endpoint, Elasticsearch, Amazon OpenSearch Service, Redshift.
Lambda processorsSynchronous invocation of the configured Lambda; returned records replace input.
TagsTagDeliveryStream, UntagDeliveryStream, ListTagsForDeliveryStream.
RoleARN assumptionS3 puts go through STS-assumed role credentials when RoleARN is set on the destination.
Persistencefirehose_stores registered as a state root; streams and destination configs survive PERSISTENCE=1 restart.

Integration points

ServiceHow it touches Firehose
S3Real put_object with the AWS-standard key layout. Hive-partitioned consumers see the data correctly.
OpenSearchReal opensearchpy indexing against the LocalEmu OpenSearch process.
RedshiftReal redshift-data.execute_statement against a Docker-backed Redshift cluster (single-row INSERTs).
LambdaSynchronous invocation as a record-transform processor; receives Firehose-transform shape.
CloudWatch Logs (subscription filters)Logs metric / subscription filters can fan out matched events to a Firehose stream; the gzip+base64 envelope is delivered as a record.
SES v1SES v1 configuration-set event destinations can route to Firehose.
CloudFormationAWS::KinesisFirehose::DeliveryStream resource provider routes through this provider.

Known limitations