Build a Step Functions batch ETL and iterate it on your laptop
You are a data engineer at a small company. An upstream partner drops a CSV of orders into your S3 bucket once a night. Each row needs to be parsed and loaded into DynamoDB so downstream queries can use it. Some rows are malformed (missing fields, negative amounts, the occasional unicode bomb) and you do not want one bad row to kill the entire nightly batch. Bad rows should be skipped, logged, and reported, not crash the workflow.
The standard AWS pattern for this is Step Functions Standard
with a Map state and a per-row Catch. One Lambda parses the
CSV once; the Map fans out over the rows; a second Lambda
loads each row into DynamoDB and raises a typed exception on
bad data; the Map's Catch turns that exception into a
{"status": "rejected"}
entry in the aggregated output instead of aborting the
execution. Three states, two Lambdas, real fault isolation.
Iterating that on real AWS is slow: every change to the state machine definition is a Terraform apply, and watching what one execution did means clicking through the Step Functions console per step. This tutorial builds the whole stack on LocalEmu in about 25 seconds, runs a real CSV through it end to end, and lets you inspect every state's input and output via the AWS CLI without leaving the terminal. Nine Terraform resources, two integration tests, no AWS account.
What you will have working at the end
Three things, all with the real terminal output captured later on this page:
- A. A six-row CSV becomes one
SUCCEEDED execution. The Step Functions output's
resultslist has six entries in the same order as the CSV: four withstatus: loadedand two withstatus: rejected. The execution did not fail. That is the Catch doing its job. - B. Only the four valid rows land
in DynamoDB. A scan returns exactly
ord-001,ord-002,ord-003,ord-004. The row with the missing order_id and the row with the negative amount never reachput_item. - C. The two pytest tests pass in under seven seconds by asserting both the data plane (the DynamoDB rows) and the control plane (the Map's aggregated output shape) on the same execution.
Architecture
caller ──StartExecution──▶ Step Functions: le-etl-etl │ ▼ Lambda: parse ── reads CSV from S3, returns rows │ ▼ Map state ── fans out over the rows │ ▼ Lambda: load_row ── per-row PutItem, raises RowRejected on bad data │ ▲ │ │ Catch routes the error to a Pass ▼ │ that emits {"status": "rejected"} DynamoDB: le-etl-orders
1. The input
The example ships with a six-row sample CSV deliberately built to exercise both code paths: four well-formed orders, one with an empty order_id, one with a negative amount. Drop your own file in S3 to drive the same workflow against a bigger or weirder dataset.
# sample/orders.csv: six rows, four valid, two intentionally bad.
order_id,amount,customer
ord-001,42.50,alice
ord-002,199.99,bob
ord-003,17.25,carol
ord-004,1000.00,dave
,50.00,eve # bad: missing order_id
ord-006,-10.00,frank # bad: negative amount 2. The two Lambdas
parse.py runs
once per execution. It downloads the CSV from S3 and returns the
rows as a JSON list. The state machine's Map state will iterate
over that list.
# src/parse.py: first Lambda. Runs once per execution.
# Reads the CSV from S3, returns the rows as a JSON list for the Map state.
_s3 = boto3.client("s3")
def handle(event, _ctx):
bucket = event["bucket"]
key = event["key"]
obj = _s3.get_object(Bucket=bucket, Key=key)
text = obj["Body"].read().decode("utf-8")
reader = csv.DictReader(io.StringIO(text))
rows = [dict(r) for r in reader]
return {"rows": rows, "count": len(rows),
"source": f"s3://{bucket}/{key}"} load_row.py
runs once per row. It validates the row and writes to DynamoDB.
The two failure modes (missing order_id, bad amount) both raise
RowRejected;
the Map's Catch turns that exception into a
{"status": "rejected"}
entry in the output. Crucially, the Lambda raises BEFORE the
put_item call,
so bad rows never reach DynamoDB:
# src/load_row.py: second Lambda. Invoked ONCE PER ROW by the Map state.
# RowRejected is the signal to the Catch clause; one bad row never aborts
# the whole batch.
_table = boto3.resource("dynamodb").Table(os.environ["TABLE_NAME"])
class RowRejected(Exception): pass
def handle(event, _ctx):
row = dict(event)
order_id = (row.get("order_id") or "").strip()
if not order_id:
raise RowRejected(f"missing order_id: {event!r}")
try:
amount = Decimal(row.get("amount") or "0")
except Exception as exc:
raise RowRejected(f"bad amount for {order_id}: {row.get('amount')!r}") from exc
if amount < 0:
raise RowRejected(f"negative amount for {order_id}: {amount}")
_table.put_item(Item={"order_id": order_id, "amount": amount,
"customer": row.get("customer", "")})
return {"order_id": order_id, "status": "loaded"} 3. The state machine
Two states plus an inner Pass: Parse (Task), LoadRows (Map),
and a Rejected branch inside the Map for the Catch to land in.
MaxConcurrency = 5
caps how many rows can be in flight at once; bump it to spread
load over more Lambda containers, drop it to one for strict
ordering. The Map's per-iteration Catch is what makes the whole
thing fault-tolerant:
# terraform/main.tf: the Step Functions state machine.
# Two states: Parse runs once, then a Map fans out over the rows.
# MaxConcurrency caps how many rows are in flight at once.
definition = jsonencode({
Comment = "Parse a CSV from S3, fan-out via Map, load each valid row into DynamoDB"
StartAt = "Parse"
States = {
Parse = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = { FunctionName = aws_lambda_function.parse.arn, "Payload.$" = "$" }
ResultSelector = { "rows.$" = "$.Payload.rows" }
ResultPath = "$.parsed"
Next = "LoadRows"
}
LoadRows = {
Type = "Map"
ItemsPath = "$.parsed.rows"
MaxConcurrency = 5
ResultPath = "$.results"
ItemProcessor = {
ProcessorConfig = { Mode = "INLINE" }
StartAt = "LoadRow"
States = {
LoadRow = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = { FunctionName = aws_lambda_function.load_row.arn, "Payload.$" = "$" }
ResultSelector = { "result.$" = "$.Payload" }
End = true
# The Catch is the load-bearing detail. Without it, one bad row
# would FAIL the whole execution. With it, the bad row becomes
# a {"status": "rejected"} entry in the results list and the
# Map keeps going.
Catch = [{
ErrorEquals = ["States.ALL"]
ResultPath = "$.error"
Next = "Rejected"
}]
}
Rejected = { Type = "Pass", Result = { status = "rejected" }, End = true }
}
}
}
}
}) 4. Run the scenario on your laptop
Clone the project, start LocalEmu in another terminal, then deploy:
$ cd localemu-examples/05-batch-etl
$ localemu start # in a separate terminal
$ ./scripts/deploy.sh local
Nine resources apply in about 25 seconds. Most of the wall time is two Lambda containers cold-starting:
$ ./scripts/deploy.sh local
aws_s3_bucket.input: Creation complete after 0s
aws_dynamodb_table.orders: Creation complete after 0s
aws_iam_role.lambda_role: Creation complete after 0s
aws_iam_role.sfn_role: Creation complete after 0s
aws_iam_role_policy.lambda_policy: Creation complete after 0s
aws_iam_role_policy.sfn_policy: Creation complete after 0s
aws_lambda_function.parse: Creation complete after 5s
aws_lambda_function.load_row: Creation complete after 10s
aws_sfn_state_machine.etl: Creation complete after 0s
Apply complete! Resources: 9 added, 0 changed, 0 destroyed.
→ deployed to local. outputs:
input_bucket = "le-etl-input"
parse_function_name = "le-etl-parse"
load_row_function_name = "le-etl-load-row"
state_machine_arn = "arn:aws:states:us-east-1:000000000000:stateMachine:le-etl-etl"
table_name = "le-etl-orders"
real 0m24.347s Now drive the scenario from the top of this page by hand: upload the sample CSV, kick off an execution, look at what came out of the Map, then scan the destination table:
$ # Upload the sample CSV (6 rows: 4 valid + 2 bad).
$ aws --endpoint-url http://localhost:4566 \
s3 cp sample/orders.csv s3://le-etl-input/orders.csv
upload: sample/orders.csv to s3://le-etl-input/orders.csv
$ # Kick off the Step Functions execution.
$ aws --endpoint-url http://localhost:4566 stepfunctions start-execution \
--state-machine-arn arn:aws:states:us-east-1:000000000000:stateMachine:le-etl-etl \
--input '{"bucket":"le-etl-input","key":"orders.csv"}'
{
"executionArn": "arn:aws:states:us-east-1:000000000000:execution:le-etl-etl:72805726-...",
"startDate": "2026-05-23T16:42:09.123000+00:00"
}
$ # Wait a few seconds, then look at the final execution output.
$ aws --endpoint-url http://localhost:4566 stepfunctions describe-execution \
--execution-arn arn:aws:states:us-east-1:000000000000:execution:le-etl-etl:72805726-... \
--query 'output' --output text | jq .results
[
{ "result": { "order_id": "ord-001", "status": "loaded" } },
{ "result": { "order_id": "ord-002", "status": "loaded" } },
{ "result": { "order_id": "ord-003", "status": "loaded" } },
{ "result": { "order_id": "ord-004", "status": "loaded" } },
{ "status": "rejected" },
{ "status": "rejected" }
]
# Six entries in the same order as the input CSV.
# Indices 4 and 5 are the rejected rows (missing order_id, negative amount).
# The Map kept going past both rejections; the execution as a whole SUCCEEDED.
$ # Scan the destination table to confirm only the valid rows landed.
$ aws --endpoint-url http://localhost:4566 dynamodb scan \
--table-name le-etl-orders --output table \
--query 'Items[*].[order_id.S,amount.N,customer.S]'
+---------+-----------+---------+
| ord-001| 42.50 | alice |
| ord-002| 199.99 | bob |
| ord-003| 17.25 | carol |
| ord-004| 1000.00 | dave |
+---------+-----------+---------+
# Exactly four rows, matching the four "loaded" entries above. The two bad
# rows never wrote to DynamoDB because the load_row Lambda raised before
# the put_item call.
The same scenario is asserted twice by
tests/test_etl.py:
once on the data plane (the DynamoDB scan), once on the control
plane (the Map's aggregated output shape). Both finish in under
seven seconds:
$ ./scripts/test.sh local
============================= test session starts ==============================
platform darwin -- Python 3.13.12, pytest-9.0.3
collected 2 items
tests/test_etl.py::test_etl_happy_path PASSED
tests/test_etl.py::test_run_output_shape PASSED
============================== 2 passed in 6.05s =============================== Tear it back down:
$ ./scripts/teardown.sh local
Destroy complete! Resources: 9 destroyed.
→ verifying teardown for prefix 'le-etl' on local
clean: nothing left behind
real 0m10.994s Deploy 24 seconds, tests 6 seconds, teardown 11 seconds. About forty seconds round trip, repeatable per change to the state machine or either Lambda.
5. The same code on real AWS
Same scripts, aws
instead of local:
$ ./scripts/test.sh aws
$ ./scripts/teardown.sh aws
Three places where the same Terraform behaves a bit differently on real AWS, all worth knowing about up front:
- • Map sub-execution history.
Real Step Functions stores a separate execution-history entry
per Map iteration, which makes the console view useful for
debugging at the cost of a small per-iteration storage charge.
LocalEmu collapses the iterations for simplicity; the
aggregated output is identical, but if you depend on
GetExecutionHistoryfor an iteration-by-iteration trace, that gap is on the AWS side only. - • Lambda payload size. Step Functions' synchronous Lambda invoke caps at 256 KB per payload. A million-row CSV will not fit in a single Parse Lambda response. For larger inputs, return S3 pointers from Parse and let LoadRow read its row from S3, or split the input upstream. Same constraint on LocalEmu, same constraint on real AWS.
- • DynamoDB throughput.
On real AWS, a Map with
MaxConcurrency=5bursting against a PAY_PER_REQUEST table can hit short throttling spikes during table cold-start. UsePROVISIONEDwith sane on-demand capacity, or let the SDK's built-in retry backoff handle it. LocalEmu does not enforce DynamoDB throughput at all.
Broader comparison in LocalEmu vs Real AWS and Known Limitations.
Get the full project
git clone https://github.com/localemu/localemu-examples
: the batch-ETL tutorial lives in
05-batch-etl/
with the Terraform, the two Lambdas, the sample CSV, the two
integration tests, and the deploy / test / teardown scripts
that produced every terminal output on this page.