Kinesis Firehose
LocalEmu Kinesis Firehose is fully LocalEmu-side, with no moto delegation. Ten of twelve operations are custom-implemented; the two not-yet-implemented ops are the server-side-encryption knobs. The interesting part is the delivery pipeline: PutRecord and PutRecordBatch dispatch records to real destinations: an S3 object is written with the AWS-standard key layout, an HTTP collector receives a real POST with the standard envelope, an Amazon OpenSearch domain receives real index calls via opensearchpy, and a Redshift cluster receives real INSERT statements via the Redshift Data API. Configured Lambda processors really run between ingest and delivery.
Operation-level coverage: see the Firehose coverage matrix.
Quick start: S3 destination
$ awsemu s3 mb s3://lake
$ awsemu firehose create-delivery-stream \
--delivery-stream-name events-to-s3 \
--s3-destination-configuration '{
"RoleARN": "arn:aws:iam::000000000000:role/FirehoseRole",
"BucketARN": "arn:aws:s3:::lake",
"Prefix": "events/",
"FileExtension": ".jsonl"
}' --query DeliveryStreamARN --output text
arn:aws:firehose:us-east-1:000000000000:deliverystream/events-to-s3
# Wait for the background activator to flip the stream to ACTIVE.
$ while [ "$(awsemu firehose describe-delivery-stream \
--delivery-stream-name events-to-s3 \
--query 'DeliveryStreamDescription.DeliveryStreamStatus' --output text)" != "ACTIVE" ]; do sleep 1; done
$ awsemu firehose put-record --delivery-stream-name events-to-s3 \
--record 'Data="'$(printf '{"event":"login"}' | base64)'"' --query RecordId
"f8b3..."
# A real Parquet/JSON object was written. The S3 key matches the AWS pattern:
# <Prefix>/YYYY/MM/DD/HH/<stream>-<timestamp>-<uuid><FileExtension>
$ awsemu s3 ls s3://lake/events/ --recursive
2026-05-20 19:42:11 18 events/2026/05/20/19/events-to-s3-2026-05-20-19-42-11-1ab2....jsonl The stream lifecycle is asynchronous: CreateDeliveryStream returns immediately with DeliveryStreamStatus=CREATING; a background activator flips it to ACTIVE after a short delay. Records sent during CREATING are rejected; poll DescribeDeliveryStream before sending in production code.
$ awsemu firehose put-record-batch \
--delivery-stream-name events-to-s3 \
--records '[
{"Data":"'$(printf '{"e":1}' | base64)'"},
{"Data":"'$(printf '{"e":2}' | base64)'"},
{"Data":"'$(printf '{"e":3}' | base64)'"}
]' --query FailedPutCount
0
# Records in the batch are concatenated (base64-decoded) into a single object.
$ awsemu s3 ls s3://lake/events/ --recursive | tail -1
2026-05-20 19:42:18 24 events/2026/05/20/19/events-to-s3-2026-05-20-19-42-18-3cd4....jsonl Architecture
Code lives at services/firehose/. The provider is pure-LocalEmu (1039 lines in provider.py), with no moto fallback. A custom FirehoseStore AccountRegionBundle holds delivery-stream records; accept_state_visitor registers it so streams survive PERSISTENCE=1 restart.
| File | Role |
|---|---|
provider.py | CRUD handlers, put-record dispatch, destination-specific deliverers, OpenSearch endpoint resolver. |
mappers.py | Config-to-description converters per destination type (S3, ExtendedS3, ES, OpenSearch, Redshift, HTTP). |
models.py | FirehoseStore + global firehose_stores AccountRegionBundle. |
Delivery dispatch (_process_records)
When a record arrives, the dispatcher walks each destination type on the stream description and delivers in this order:
- Run configured
ProcessingConfiguration.Processors[]in order. Lambda processors are invoked synchronously and their returned records replace the input batch. - If
ElasticsearchDestinationDescriptionorAmazonopensearchserviceDestinationDescription: index each record viaopensearchpy.OpenSearchagainst the configured domain. - If
S3DestinationDescriptionorExtendedS3DestinationDescription: base64-decode and concatenate the records, thens3.put_objectto the configured bucket using the key pattern below. - If
HttpEndpointDestinationDescription: build an envelope{requestId, timestamp, records:[{data}...]}andrequests.post(url, json=..., timeout=60). - If
RedshiftDestinationDescription: write the batch to S3 first (via the nested S3 destination), then issue realredshift-data.execute_statement('INSERT INTO <DataTableName> VALUES (...)', Parameters=...)against the cluster identified in the JDBC URL. OneINSERTper row (the Redshift Data API single-row constraint).
S3 object key layout
<Prefix>/YYYY/MM/DD/HH/<DeliveryStreamName>-YYYY-MM-DD-HH-MM-SS-<uuid><FileExtension> This matches the AWS documented key pattern exactly, including the FileExtension suffix when configured. Hive-partitioned consumers (Athena, EMR, Glue crawlers) pointing at the prefix work out of the box.
HTTP endpoint destination
The HTTP collector receives a real POST <Url> with Content-Type: application/json and the AWS-standard payload envelope. The 60 second client-side timeout matches the AWS-configured default.
$ awsemu firehose create-delivery-stream \
--delivery-stream-name events-to-http \
--http-endpoint-destination-configuration '{
"EndpointConfiguration": {
"Url": "http://host.docker.internal:8080/firehose",
"Name": "local-collector"
},
"RoleARN": "arn:aws:iam::000000000000:role/FirehoseRole",
"S3Configuration": {
"RoleARN": "arn:aws:iam::000000000000:role/FirehoseRole",
"BucketARN": "arn:aws:s3:::lake"
}
}'
$ awsemu firehose put-record --delivery-stream-name events-to-http \
--record 'Data="'$(printf '{"hello":"world"}' | base64)'"' >/dev/null
# The collector receives a POST with the AWS-standard envelope:
# { "requestId":..., "timestamp":..., "records":[{"data":"<base64>"},...] } OpenSearch destination
Firehose builds an opensearchpy.OpenSearch client against the domain ARN's resolved endpoint (LocalEmu OpenSearch domain endpoint, derived from opensearch.describe-domain.Endpoint) and indexes each record into IndexName. Connection objects are cached per (account, region, endpoint) under a lock.
$ awsemu opensearch create-domain --domain-name logs --query DomainStatus.ARN --output text
arn:aws:es:us-east-1:000000000000:domain/logs
$ awsemu firehose create-delivery-stream \
--delivery-stream-name events-to-os \
--amazonopensearchservice-destination-configuration '{
"RoleARN": "arn:aws:iam::000000000000:role/FirehoseRole",
"DomainARN": "arn:aws:es:us-east-1:000000000000:domain/logs",
"IndexName": "events",
"S3BackupMode":"FailedDocumentsOnly",
"S3Configuration": {
"RoleARN": "arn:aws:iam::000000000000:role/FirehoseRole",
"BucketARN": "arn:aws:s3:::lake"
}
}'
$ awsemu firehose put-record --delivery-stream-name events-to-os \
--record 'Data="'$(printf '{"level":"INFO","msg":"hi"}' | base64)'"' >/dev/null
$ curl -s http://localhost:4566/logs/_search?pretty | python3 -c \
'import sys,json;d=json.load(sys.stdin);print(d["hits"]["total"]["value"])'
1 This pairs with the LocalEmu OpenSearch service, which runs a real OpenSearch process. See the OpenSearch reference for how the domain is provisioned.
Redshift destination
Records flow through S3 first (the S3Configuration sub-destination, mandatory), then a real redshift-data.execute_statement('INSERT INTO <DataTableName> VALUES (...)', Parameters=<row>) is issued against the cluster the JDBC URL points at. The cluster identifier is parsed from the URL host, the region from the URL itself, and the account from the RoleARN. To exercise this destination end-to-end, you need a real Redshift cluster running, which means REDSHIFT_DOCKER_BACKEND=1 (see the RDS / Redshift Docker reference).
Lambda transform processors
ProcessingConfiguration.Enabled=true with a Lambda Processor makes Firehose invoke the configured Lambda before delivery. The Lambda receives the records in the AWS-standard Firehose-transform shape and must return records with result: "Ok" or "Dropped" or "ProcessingFailed". Returned data replaces the input batch before the destination dispatch above.
$ awsemu lambda create-function --function-name upper-case \
--runtime python3.12 --handler index.handler \
--role arn:aws:iam::000000000000:role/lambda-role \
--zip-file fileb://upper-case.zip >/dev/null
$ awsemu firehose update-destination --delivery-stream-name events-to-s3 \
--current-delivery-stream-version-id 1 \
--destination-id destinationId-000000000001 \
--extended-s3-destination-update '{
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [{
"Type":"Lambda",
"Parameters":[
{"ParameterName":"LambdaArn","ParameterValue":"arn:aws:lambda:us-east-1:000000000000:function:upper-case"}
]
}]
}
}'
# The Lambda is really invoked between PutRecord and S3 write; the function's
# returned records (base64) replace the input ones before delivery. Features supported
| Feature | Notes |
|---|---|
| Stream lifecycle | CreateDeliveryStream (background activator flips to ACTIVE), DescribeDeliveryStream, ListDeliveryStreams, DeleteDeliveryStream, UpdateDestination. |
| Ingest | PutRecord, PutRecordBatch. Records are base64-encoded payloads up to 1 MB each. |
| Destinations with real dispatch | S3, ExtendedS3, HTTP endpoint, Elasticsearch, Amazon OpenSearch Service, Redshift. |
| Lambda processors | Synchronous invocation of the configured Lambda; returned records replace input. |
| Tags | TagDeliveryStream, UntagDeliveryStream, ListTagsForDeliveryStream. |
| RoleARN assumption | S3 puts go through STS-assumed role credentials when RoleARN is set on the destination. |
| Persistence | firehose_stores registered as a state root; streams and destination configs survive PERSISTENCE=1 restart. |
Integration points
| Service | How it touches Firehose |
|---|---|
| S3 | Real put_object with the AWS-standard key layout. Hive-partitioned consumers see the data correctly. |
| OpenSearch | Real opensearchpy indexing against the LocalEmu OpenSearch process. |
| Redshift | Real redshift-data.execute_statement against a Docker-backed Redshift cluster (single-row INSERTs). |
| Lambda | Synchronous invocation as a record-transform processor; receives Firehose-transform shape. |
| CloudWatch Logs (subscription filters) | Logs metric / subscription filters can fan out matched events to a Firehose stream; the gzip+base64 envelope is delivered as a record. |
| SES v1 | SES v1 configuration-set event destinations can route to Firehose. |
| CloudFormation | AWS::KinesisFirehose::DeliveryStream resource provider routes through this provider. |
Known limitations
- •No buffering enforcement.
BufferingHints.SizeInMBsandIntervalInSecondsare accepted at Create/Update but ignored at runtime. EveryPutRecord/PutRecordBatchdelivers immediately. Tests that depend on AWS-style aggregation (one object per buffer interval, not per call) will see different object counts on LocalEmu. - •No data-format conversion.
DataFormatConversionConfiguration(JSON-to-Parquet / JSON-to-ORC via Glue schema) is accepted in the API but ignored. Records land at the destination in the raw bytes they arrived as. Build the Parquet yourself in a Lambda processor if you need it. - •No dynamic partitioning.
DynamicPartitioningConfiguration,MetadataExtractionQuery, and the!{partitionKeyFromQuery:...}prefix tokens are metadata-only. The S3 prefix is written literally. - •No SSE-at-rest.
StartDeliveryStreamEncryptionandStopDeliveryStreamEncryptionreturnNotImplemented. The KMS-encryption metadata on the stream description is not honored. - •No source-record backup.
S3BackupConfiguration/S3BackupMode=Enabledare metadata-only. The backup bucket is never written to (includingFailedDocumentsOnlyfor OpenSearch destinations). - •No Kinesis stream source poller.
KinesisStreamSourceConfigurationis accepted onCreateDeliveryStreambut no background worker pulls records from the source stream to drive delivery. Streams created with this source spec are inert. - •No Splunk destination dispatch.
SplunkDestinationConfigurationis accepted at the API but the put-record path does not deliver to a Splunk HEC endpoint. - •No Snowflake, MSK, or Iceberg destinations. Recent AWS additions; not wired in LocalEmu.
- •No CloudWatch Logs error logging.
CloudWatchLoggingOptionson each destination is metadata-only; failed deliveries surface as exceptions in the LocalEmu logs but are not written to the configured log group. - •Redshift destination requires the Redshift Docker backend. The
INSERTgoes throughredshift-data.execute_statement, which only works when a real cluster is running. Start LocalEmu withREDSHIFT_DOCKER_BACKEND=1.