Docs/ Use Cases/ Batch ETL

Build a Step Functions batch ETL and iterate it on your laptop

You are a data engineer at a small company. An upstream partner drops a CSV of orders into your S3 bucket once a night. Each row needs to be parsed and loaded into DynamoDB so downstream queries can use it. Some rows are malformed (missing fields, negative amounts, the occasional unicode bomb) and you do not want one bad row to kill the entire nightly batch. Bad rows should be skipped, logged, and reported, not crash the workflow.

The standard AWS pattern for this is Step Functions Standard with a Map state and a per-row Catch. One Lambda parses the CSV once; the Map fans out over the rows; a second Lambda loads each row into DynamoDB and raises a typed exception on bad data; the Map's Catch turns that exception into a {"status": "rejected"} entry in the aggregated output instead of aborting the execution. Three states, two Lambdas, real fault isolation.

Iterating that on real AWS is slow: every change to the state machine definition is a Terraform apply, and watching what one execution did means clicking through the Step Functions console per step. This tutorial builds the whole stack on LocalEmu in about 25 seconds, runs a real CSV through it end to end, and lets you inspect every state's input and output via the AWS CLI without leaving the terminal. Nine Terraform resources, two integration tests, no AWS account.

What you will have working at the end

Three things, all with the real terminal output captured later on this page:

Architecture

caller ──StartExecution──▶ Step Functions: le-etl-etl
                                       │
                                       ▼
                              Lambda: parse    ── reads CSV from S3, returns rows
                                       │
                                       ▼
                              Map state       ── fans out over the rows
                                       │
                                       ▼
                              Lambda: load_row ── per-row PutItem, raises RowRejected on bad data
                                  │           ▲
                                  │           │  Catch routes the error to a Pass
                                  ▼           │  that emits {"status": "rejected"}
                              DynamoDB: le-etl-orders

1. The input

The example ships with a six-row sample CSV deliberately built to exercise both code paths: four well-formed orders, one with an empty order_id, one with a negative amount. Drop your own file in S3 to drive the same workflow against a bigger or weirder dataset.

sample/orders.csv
# sample/orders.csv: six rows, four valid, two intentionally bad.
order_id,amount,customer
ord-001,42.50,alice
ord-002,199.99,bob
ord-003,17.25,carol
ord-004,1000.00,dave
,50.00,eve              # bad: missing order_id
ord-006,-10.00,frank    # bad: negative amount

2. The two Lambdas

parse.py runs once per execution. It downloads the CSV from S3 and returns the rows as a JSON list. The state machine's Map state will iterate over that list.

src/parse.py
# src/parse.py: first Lambda. Runs once per execution.
# Reads the CSV from S3, returns the rows as a JSON list for the Map state.

_s3 = boto3.client("s3")


def handle(event, _ctx):
    bucket = event["bucket"]
    key    = event["key"]
    obj    = _s3.get_object(Bucket=bucket, Key=key)
    text   = obj["Body"].read().decode("utf-8")
    reader = csv.DictReader(io.StringIO(text))
    rows   = [dict(r) for r in reader]
    return {"rows": rows, "count": len(rows),
            "source": f"s3://{bucket}/{key}"}

load_row.py runs once per row. It validates the row and writes to DynamoDB. The two failure modes (missing order_id, bad amount) both raise RowRejected; the Map's Catch turns that exception into a {"status": "rejected"} entry in the output. Crucially, the Lambda raises BEFORE the put_item call, so bad rows never reach DynamoDB:

src/load_row.py
# src/load_row.py: second Lambda. Invoked ONCE PER ROW by the Map state.
# RowRejected is the signal to the Catch clause; one bad row never aborts
# the whole batch.

_table = boto3.resource("dynamodb").Table(os.environ["TABLE_NAME"])


class RowRejected(Exception): pass


def handle(event, _ctx):
    row      = dict(event)
    order_id = (row.get("order_id") or "").strip()
    if not order_id:
        raise RowRejected(f"missing order_id: {event!r}")

    try:
        amount = Decimal(row.get("amount") or "0")
    except Exception as exc:
        raise RowRejected(f"bad amount for {order_id}: {row.get('amount')!r}") from exc
    if amount < 0:
        raise RowRejected(f"negative amount for {order_id}: {amount}")

    _table.put_item(Item={"order_id": order_id, "amount": amount,
                          "customer": row.get("customer", "")})
    return {"order_id": order_id, "status": "loaded"}

3. The state machine

Two states plus an inner Pass: Parse (Task), LoadRows (Map), and a Rejected branch inside the Map for the Catch to land in. MaxConcurrency = 5 caps how many rows can be in flight at once; bump it to spread load over more Lambda containers, drop it to one for strict ordering. The Map's per-iteration Catch is what makes the whole thing fault-tolerant:

terraform/main.tf (state-machine definition)
# terraform/main.tf: the Step Functions state machine.
# Two states: Parse runs once, then a Map fans out over the rows.
# MaxConcurrency caps how many rows are in flight at once.

definition = jsonencode({
  Comment = "Parse a CSV from S3, fan-out via Map, load each valid row into DynamoDB"
  StartAt = "Parse"
  States  = {
    Parse = {
      Type        = "Task"
      Resource    = "arn:aws:states:::lambda:invoke"
      Parameters  = { FunctionName = aws_lambda_function.parse.arn, "Payload.$" = "$" }
      ResultSelector = { "rows.$" = "$.Payload.rows" }
      ResultPath  = "$.parsed"
      Next        = "LoadRows"
    }
    LoadRows = {
      Type           = "Map"
      ItemsPath      = "$.parsed.rows"
      MaxConcurrency = 5
      ResultPath     = "$.results"
      ItemProcessor = {
        ProcessorConfig = { Mode = "INLINE" }
        StartAt = "LoadRow"
        States  = {
          LoadRow = {
            Type     = "Task"
            Resource = "arn:aws:states:::lambda:invoke"
            Parameters = { FunctionName = aws_lambda_function.load_row.arn, "Payload.$" = "$" }
            ResultSelector = { "result.$" = "$.Payload" }
            End      = true
            # The Catch is the load-bearing detail. Without it, one bad row
            # would FAIL the whole execution. With it, the bad row becomes
            # a {"status": "rejected"} entry in the results list and the
            # Map keeps going.
            Catch = [{
              ErrorEquals = ["States.ALL"]
              ResultPath  = "$.error"
              Next        = "Rejected"
            }]
          }
          Rejected = { Type = "Pass", Result = { status = "rejected" }, End = true }
        }
      }
    }
  }
})

4. Run the scenario on your laptop

Clone the project, start LocalEmu in another terminal, then deploy:

$ git clone https://github.com/localemu/localemu-examples
$ cd localemu-examples/05-batch-etl
$ localemu start   # in a separate terminal
$ ./scripts/deploy.sh local

Nine resources apply in about 25 seconds. Most of the wall time is two Lambda containers cold-starting:

Terminal: deploy
$ ./scripts/deploy.sh local
aws_s3_bucket.input:                  Creation complete after 0s
aws_dynamodb_table.orders:            Creation complete after 0s
aws_iam_role.lambda_role:             Creation complete after 0s
aws_iam_role.sfn_role:                Creation complete after 0s
aws_iam_role_policy.lambda_policy:    Creation complete after 0s
aws_iam_role_policy.sfn_policy:       Creation complete after 0s
aws_lambda_function.parse:            Creation complete after 5s
aws_lambda_function.load_row:         Creation complete after 10s
aws_sfn_state_machine.etl:            Creation complete after 0s

Apply complete! Resources: 9 added, 0 changed, 0 destroyed.

 deployed to local. outputs:
input_bucket            = "le-etl-input"
parse_function_name     = "le-etl-parse"
load_row_function_name  = "le-etl-load-row"
state_machine_arn       = "arn:aws:states:us-east-1:000000000000:stateMachine:le-etl-etl"
table_name              = "le-etl-orders"

real    0m24.347s

Now drive the scenario from the top of this page by hand: upload the sample CSV, kick off an execution, look at what came out of the Map, then scan the destination table:

Terminal: full scenario
$ # Upload the sample CSV (6 rows: 4 valid + 2 bad).
$ aws --endpoint-url http://localhost:4566 \
       s3 cp sample/orders.csv s3://le-etl-input/orders.csv
upload: sample/orders.csv to s3://le-etl-input/orders.csv


$ # Kick off the Step Functions execution.
$ aws --endpoint-url http://localhost:4566 stepfunctions start-execution \
       --state-machine-arn arn:aws:states:us-east-1:000000000000:stateMachine:le-etl-etl \
       --input '{"bucket":"le-etl-input","key":"orders.csv"}'
{
  "executionArn": "arn:aws:states:us-east-1:000000000000:execution:le-etl-etl:72805726-...",
  "startDate":    "2026-05-23T16:42:09.123000+00:00"
}


$ # Wait a few seconds, then look at the final execution output.
$ aws --endpoint-url http://localhost:4566 stepfunctions describe-execution \
       --execution-arn arn:aws:states:us-east-1:000000000000:execution:le-etl-etl:72805726-... \
       --query 'output' --output text | jq .results
[
  { "result": { "order_id": "ord-001", "status": "loaded"   } },
  { "result": { "order_id": "ord-002", "status": "loaded"   } },
  { "result": { "order_id": "ord-003", "status": "loaded"   } },
  { "result": { "order_id": "ord-004", "status": "loaded"   } },
  {           "status": "rejected" },
  {           "status": "rejected" }
]

# Six entries in the same order as the input CSV.
# Indices 4 and 5 are the rejected rows (missing order_id, negative amount).
# The Map kept going past both rejections; the execution as a whole SUCCEEDED.


$ # Scan the destination table to confirm only the valid rows landed.
$ aws --endpoint-url http://localhost:4566 dynamodb scan \
       --table-name le-etl-orders --output table \
       --query 'Items[*].[order_id.S,amount.N,customer.S]'
+---------+-----------+---------+
|  ord-001|  42.50    |  alice  |
|  ord-002|  199.99   |  bob    |
|  ord-003|  17.25    |  carol  |
|  ord-004|  1000.00  |  dave   |
+---------+-----------+---------+

# Exactly four rows, matching the four "loaded" entries above. The two bad
# rows never wrote to DynamoDB because the load_row Lambda raised before
# the put_item call.

The same scenario is asserted twice by tests/test_etl.py: once on the data plane (the DynamoDB scan), once on the control plane (the Map's aggregated output shape). Both finish in under seven seconds:

Terminal: pytest
$ ./scripts/test.sh local

============================= test session starts ==============================
platform darwin -- Python 3.13.12, pytest-9.0.3
collected 2 items

tests/test_etl.py::test_etl_happy_path        PASSED
tests/test_etl.py::test_run_output_shape      PASSED

============================== 2 passed in 6.05s ===============================

Tear it back down:

Terminal: teardown
$ ./scripts/teardown.sh local
Destroy complete! Resources: 9 destroyed.

 verifying teardown for prefix 'le-etl' on local
  clean: nothing left behind

real    0m10.994s

Deploy 24 seconds, tests 6 seconds, teardown 11 seconds. About forty seconds round trip, repeatable per change to the state machine or either Lambda.

5. The same code on real AWS

Same scripts, aws instead of local:

$ ./scripts/deploy.sh aws
$ ./scripts/test.sh     aws
$ ./scripts/teardown.sh aws

Three places where the same Terraform behaves a bit differently on real AWS, all worth knowing about up front:

Broader comparison in LocalEmu vs Real AWS and Known Limitations.

Get the full project

git clone https://github.com/localemu/localemu-examples : the batch-ETL tutorial lives in 05-batch-etl/ with the Terraform, the two Lambdas, the sample CSV, the two integration tests, and the deploy / test / teardown scripts that produced every terminal output on this page.

Where to go next