Step Functions
LocalEmu ships a full Amazon States Language interpreter in pure Python. 36 of the 37 documented Step Functions operations are implemented: state machines (Standard and Express), executions, activity tasks, eight ASL state types, four task integration patterns, eighteen intrinsic functions, retry and catch blocks, JSONPath-based input and output filtering, and CloudWatch logging of execution history. The ASL parser is generated from the ANTLR grammar to Python at build time, so the runtime needs no JVM, no external interpreter, just the LocalEmu Python process.
A runnable saga-pattern walkthrough lives at Step Functions Saga.
Operation-level coverage: see the Step Functions coverage matrix.
Quick start
Create a state machine, start an execution, read the result.
$ cat > sm.json <<'EOF'
{
"Comment": "Hello from LocalEmu",
"StartAt": "Greet",
"States": {
"Greet": {
"Type": "Pass",
"Result": "hello",
"End": true
}
}
}
EOF
$ SM_ARN=$(awsemu stepfunctions create-state-machine \
--name hello-sm \
--definition file://sm.json \
--role-arn arn:aws:iam::000000000000:role/sm-role \
--query 'stateMachineArn' --output text)
$ EXEC=$(awsemu stepfunctions start-execution \
--state-machine-arn "$SM_ARN" \
--input '{}' \
--query 'executionArn' --output text)
$ awsemu stepfunctions describe-execution \
--execution-arn "$EXEC" \
--query '{Status:status,Output:output}'
{
"Status": "SUCCEEDED",
"Output": "\"hello\""
} Architecture
Step Functions lives at services/stepfunctions/. A CreateStateMachine call goes through the ANTLR-generated parser, builds an evaluable program tree, and stores it in the SFN store. A StartExecution call kicks off an asynchronous worker that walks the program tree state by state.
- •Provider at
services/stepfunctions/provider.py:186:StepFunctionsProviderwires every API operation.StartExecutionat:853,StartSyncExecutionat:944. - •ASL parser at
asl/parse/asl_parser.py:49: theAmazonStateLanguageParser.parsefacade invokes the ANTLR-generated lexer + parser (asl/antlr/runtime/ASLLexer.py,ASLParser.py) and walks the parse tree with visitors to build the executable component tree. - •Intrinsic-function parser at
asl/antlr/runtime/ASLIntrinsicLexer.py+ASLIntrinsicParser.py: a separate ANTLR grammar parsesStates.Format(...),States.Array(...), and the rest of the intrinsic syntax. - •Program tree at
asl/component/program/program.py:46:Programholds the top-levelStatesregistry. Each state type is its own evaluator class (StatePass,StateTask,StateChoice, etc.). - •Execution at
backend/execution.py:96:Executionowns the worker thread, the execution context (input, output, history), and the history recorder consumed byGetExecutionHistory(backend/execution.py:258). - •Service-task dispatch at
asl/component/state/state_execution/state_task/service/state_task_service_factory.py: extracts the AWS service from the task'sResourceARN and routes to the matching service-task class. The.sync,.sync:2, and.waitForTaskTokenpatterns shareStateTaskServiceCallback(state_task_service_callback.py:52).
State is held in SFNStore (backend/models.py:12): state machines, executions, activities, aliases, versions. When PERSISTENCE=1 is set, the store is serialised and restored across LocalEmu restarts.
State types
| State | Purpose | Source |
|---|---|---|
Pass | Move payload through or inject a fixed Result. | state_pass.py:12 |
Task | Invoke an external action (Lambda, SQS, DynamoDB, any AWS SDK op). | state_task.py:32 |
Choice | Branch on payload values using rule operators. | state_choice.py:15 |
Wait | Pause for Seconds, until a Timestamp, or for a value extracted via SecondsPath / TimestampPath. | state_wait.py:12 |
Parallel | Run independent branches concurrently, return an array of outputs. | state_parallel.py:21 |
Map | Iterate over a payload list or an S3 dataset (ItemReader), running the inline workflow per item with MaxConcurrency. | state_map.py:100 |
Succeed | Terminal success state. | state_succeed.py:12 |
Fail | Terminal failure state with Error and Cause. | state_fail.py:20 |
Task integration patterns
A Task state's Resource ARN selects one of four patterns. All four are honoured by LocalEmu (state_task_service_callback.py):
| Pattern | When to use | Line |
|---|---|---|
| Request / Response | Default. Synchronous invoke, return the response as the task output. Used for lambda:invoke, sqs:sendMessage, DynamoDB, SNS, EventBridge. | state_task_service.py:57 |
.sync | Start a long-running job, then poll until it completes. Used for ecs:runTask.sync, batch:submitJob.sync, glue:startJobRun.sync. | state_task_service_callback.py:73 |
.sync:2 | Newer .sync variant for integrations that need a different completion-detection protocol (used by ECS, Batch, Step Functions). | state_task_service_callback.py:84 |
.waitForTaskToken | Hand a task token to an external worker and pause. The execution resumes when the worker calls SendTaskSuccess or SendTaskFailure with that token. | state_task_service_callback.py:157 |
Service integrations
Eleven first-class integrations plus the generic AWS SDK integration that lets any boto3 operation be called as a Task. All in services/stepfunctions/asl/component/state/state_execution/state_task/service/.
| Integration | Sub-patterns | File |
|---|---|---|
| Lambda | Request/Response and .sync | state_task_service_lambda.py:49 |
| DynamoDB | getItem, putItem, updateItem, deleteItem, batchGetItem, batchWriteItem | state_task_service_dynamodb.py |
| SQS | sendMessage (also with .waitForTaskToken) | state_task_service_sqs.py:44 |
| SNS | publish | state_task_service_sns.py:45 |
| EventBridge | PutEvents | state_task_service_events.py:43 |
| ECS | RunTask with .sync | state_task_service_ecs.py:42 |
| Step Functions (nested) | startExecution with .sync and .sync:2 | state_task_service_sfn.py:52 |
| Batch | SubmitJob, TerminateJob, DescribeJobs with .sync | state_task_service_batch.py:62 |
| Glue | StartJobRun with .sync | state_task_service_glue.py:81 |
| API Gateway | invoke via the HTTP Task extension | state_task_service_api_gateway.py:108 |
| AWS SDK integration | Generic arn:aws:states:::aws-sdk:service:operation. Any boto3 op on any LocalEmu service. | state_task_service_aws_sdk.py:36 |
Intrinsic functions
Eighteen functions available inside the dollar-string and Parameters substitution expressions. Implementations live under asl/component/intrinsic/function/statesfunction/.
| Family | Functions |
|---|---|
| Array | States.Array, States.ArrayPartition, States.ArrayContains, States.ArrayRange, States.ArrayGetItem, States.ArrayLength, States.ArrayUnique |
| Encoding | States.Base64Encode, States.Base64Decode |
| Hashing | States.Hash |
| JSON | States.JsonMerge, States.JsonToString, States.StringToJson |
| Math | States.MathRandom, States.MathAdd |
| String | States.StringSplit, States.Format |
| Unique IDs | States.UUID |
Error handling
- •Retry blocks per state (
common/retry/retry_decl.py:13). EachRetriermatches onErrorEqualsand appliesIntervalSeconds,MaxAttempts, andBackoffRate. Backoff applied atexecute_state.py:146. - •Catch blocks per state (
common/catch/catch_decl.py:15). When all retries are exhausted, eachCatchermatches onErrorEquals, routes toNext, and writes the error into the payload atResultPath(execute_state.py:68). - •Built-in error names:
States.ALL,States.Timeout,States.TaskFailed,States.Permissions,States.NoChoiceMatched, plus the user-defined errors raised from aFailstate. Defined instate_fail/error_decl.py. - •JSONPath input and output filtering:
InputPath,OutputPath,ResultPath,ItemsPath, and the dollar-suffix syntax (key.$) are evaluated by the LocalEmu JSONata adapter atasl/jsonata/jsonata.py.
Workflow types
| Type | Invocation | Source |
|---|---|---|
| Standard (default) | StartExecution returns immediately; the execution runs asynchronously. Poll with DescribeExecution or stream events with GetExecutionHistory. | backend/state_machine.py:69 |
| Express | StartSyncExecution blocks until the execution finishes and returns the final output in the same response. Created with --type EXPRESS. | provider.py:944 |
Activity tasks (human-in-the-loop)
Activities are worker pools that pull tasks off an SFN-managed queue. The state machine sends a task through an Activity ARN; an external worker calls GetActivityTask to pick it up, then reports back via SendTaskSuccess, SendTaskFailure, or SendTaskHeartbeat.
- •
CreateActivity/DescribeActivity/ListActivities/DeleteActivityatbackend/activity.py:23. - •
GetActivityTaskpolls the activity queue and returns a task token plus the input payload. - •
SendTaskSuccess(provider.py:781),SendTaskFailure(provider.py:803),SendTaskHeartbeat(provider.py:764): the same trio works for.waitForTaskTokenservice tasks.
Pause execution until an external approval (.waitForTaskToken)
The state machine drops the task token onto an SQS queue and waits. An external worker picks the token up, makes a decision, and calls SendTaskSuccess. The execution then resumes.
# State machine with a .waitForTaskToken task. The task pauses until
# the worker calls SendTaskSuccess with the matching token.
$ cat > sm.json <<'EOF'
{
"StartAt": "WaitForHuman",
"States": {
"WaitForHuman": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "http://sqs.us-east-1.localhost:4566/000000000000/approvals",
"MessageBody": {
"approval_id": "a-42",
"token.$": "$$.Task.Token"
}
},
"End": true
}
}
}
EOF
$ awsemu sqs create-queue --queue-name approvals >/dev/null
$ SM_ARN=$(awsemu stepfunctions create-state-machine \
--name approval-flow --definition file://sm.json \
--role-arn arn:aws:iam::000000000000:role/sm-role \
--query 'stateMachineArn' --output text)
$ EXEC=$(awsemu stepfunctions start-execution \
--state-machine-arn "$SM_ARN" --input '{}' \
--query 'executionArn' --output text)
# The execution is RUNNING. Pull the token off the queue and approve it.
$ TOKEN=$(awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/approvals \
--query 'Messages[0].Body' --output text | python3 -c 'import sys,json; print(json.loads(sys.stdin.read())["token"])')
$ awsemu stepfunctions send-task-success \
--task-token "$TOKEN" \
--output '{"approved": true}'
$ awsemu stepfunctions describe-execution \
--execution-arn "$EXEC" --query 'status' --output text
SUCCEEDED Map state fan-out
Map iterates over the array at $.orders, runs the inline ItemProcessor workflow per item with at most three in parallel, and collects the per-item results into an array.
# Map state iterates a list, runs the inline workflow for each item.
$ cat > sm.json <<'EOF'
{
"StartAt": "FanOut",
"States": {
"FanOut": {
"Type": "Map",
"ItemsPath": "$.orders",
"MaxConcurrency": 3,
"ItemProcessor": {
"StartAt": "Charge",
"States": {
"Charge": {
"Type": "Pass",
"Result": "charged",
"End": true
}
}
},
"End": true
}
}
}
EOF
$ SM_ARN=$(awsemu stepfunctions create-state-machine \
--name fan-out --definition file://sm.json \
--role-arn arn:aws:iam::000000000000:role/sm-role \
--query 'stateMachineArn' --output text)
$ EXEC=$(awsemu stepfunctions start-execution \
--state-machine-arn "$SM_ARN" \
--input '{"orders": [{"id":"o1"},{"id":"o2"},{"id":"o3"}]}' \
--query 'executionArn' --output text)
$ awsemu stepfunctions describe-execution \
--execution-arn "$EXEC" --query 'output' --output text
["charged","charged","charged"] Configuration
One Step Functions-specific environment variable:
| Variable | Default | Purpose |
|---|---|---|
SFN_MOCK_CONFIG | (unset) | Path to a MockConfigFile.json for TestState. The file declares per-state mock responses so a state machine can be tested without invoking real downstream services. |
Limits and defaults
| Limit | LocalEmu | AWS | Source |
|---|---|---|---|
| Max state machine name length | 80 chars | 80 chars | provider.py:262,274 |
| Max activity name length | 80 chars | 80 chars | provider.py:262 |
| Max state machine definition size | not enforced | 1 MiB | LocalEmu accepts larger. |
| Max execution history events | not enforced | 25 000 | LocalEmu does not truncate. |
| Max execution duration (Standard) | not enforced | 1 year | LocalEmu does not time out. |
| Max execution duration (Express) | not enforced | 5 min | LocalEmu does not time out. |
Known limitations
- •
RedriveExecutionis not implemented. The AWS-2024 feature that resumes a failed execution from its failed step returnsNotImplementedException. Restart the execution from the start instead. - •
ListMapRunsdoes not paginate. All map runs come back in one response;NextTokenis ignored (provider.py:1074). - •Activity tagging is partial.
TagResource/UntagResource/ListTagsForResourceon activity ARNs succeed but tags do not round-trip (provider.py:1465,1477,1489). - •
ValidateStateMachineDefinitionis partial. The static analysers catch the common rule violations but the per-issueseverityfield is not yet populated the way AWS returns it (provider.py:1757-1759). - •
InvalidArnerrors are generic. AWS tells you which part of the ARN was wrong (partition, region, account, resource); LocalEmu returns a singleInvalidArncode (provider.py:216,226,232). - •Execution-name format validation is permissive. Names that AWS would reject for length or characters may still be accepted (
provider.py:891,982).