Docs / Step Functions

Step Functions

LocalEmu ships a full Amazon States Language interpreter in pure Python. 36 of the 37 documented Step Functions operations are implemented: state machines (Standard and Express), executions, activity tasks, eight ASL state types, four task integration patterns, eighteen intrinsic functions, retry and catch blocks, JSONPath-based input and output filtering, and CloudWatch logging of execution history. The ASL parser is generated from the ANTLR grammar to Python at build time, so the runtime needs no JVM, no external interpreter, just the LocalEmu Python process.

A runnable saga-pattern walkthrough lives at Step Functions Saga.

Operation-level coverage: see the Step Functions coverage matrix.

Quick start

Create a state machine, start an execution, read the result.

Terminal
$ cat > sm.json <<'EOF'
{
  "Comment":  "Hello from LocalEmu",
  "StartAt":  "Greet",
  "States": {
    "Greet": {
      "Type":   "Pass",
      "Result": "hello",
      "End":    true
    }
  }
}
EOF

$ SM_ARN=$(awsemu stepfunctions create-state-machine \
    --name hello-sm \
    --definition file://sm.json \
    --role-arn arn:aws:iam::000000000000:role/sm-role \
    --query 'stateMachineArn' --output text)

$ EXEC=$(awsemu stepfunctions start-execution \
    --state-machine-arn "$SM_ARN" \
    --input '{}' \
    --query 'executionArn' --output text)

$ awsemu stepfunctions describe-execution \
    --execution-arn "$EXEC" \
    --query '{Status:status,Output:output}'
{
  "Status": "SUCCEEDED",
  "Output": "\"hello\""
}

Architecture

Step Functions lives at services/stepfunctions/. A CreateStateMachine call goes through the ANTLR-generated parser, builds an evaluable program tree, and stores it in the SFN store. A StartExecution call kicks off an asynchronous worker that walks the program tree state by state.

State is held in SFNStore (backend/models.py:12): state machines, executions, activities, aliases, versions. When PERSISTENCE=1 is set, the store is serialised and restored across LocalEmu restarts.

State types

StatePurposeSource
PassMove payload through or inject a fixed Result.state_pass.py:12
TaskInvoke an external action (Lambda, SQS, DynamoDB, any AWS SDK op).state_task.py:32
ChoiceBranch on payload values using rule operators.state_choice.py:15
WaitPause for Seconds, until a Timestamp, or for a value extracted via SecondsPath / TimestampPath.state_wait.py:12
ParallelRun independent branches concurrently, return an array of outputs.state_parallel.py:21
MapIterate over a payload list or an S3 dataset (ItemReader), running the inline workflow per item with MaxConcurrency.state_map.py:100
SucceedTerminal success state.state_succeed.py:12
FailTerminal failure state with Error and Cause.state_fail.py:20

Task integration patterns

A Task state's Resource ARN selects one of four patterns. All four are honoured by LocalEmu (state_task_service_callback.py):

PatternWhen to useLine
Request / ResponseDefault. Synchronous invoke, return the response as the task output. Used for lambda:invoke, sqs:sendMessage, DynamoDB, SNS, EventBridge.state_task_service.py:57
.syncStart a long-running job, then poll until it completes. Used for ecs:runTask.sync, batch:submitJob.sync, glue:startJobRun.sync.state_task_service_callback.py:73
.sync:2Newer .sync variant for integrations that need a different completion-detection protocol (used by ECS, Batch, Step Functions).state_task_service_callback.py:84
.waitForTaskTokenHand a task token to an external worker and pause. The execution resumes when the worker calls SendTaskSuccess or SendTaskFailure with that token.state_task_service_callback.py:157

Service integrations

Eleven first-class integrations plus the generic AWS SDK integration that lets any boto3 operation be called as a Task. All in services/stepfunctions/asl/component/state/state_execution/state_task/service/.

IntegrationSub-patternsFile
LambdaRequest/Response and .syncstate_task_service_lambda.py:49
DynamoDBgetItem, putItem, updateItem, deleteItem, batchGetItem, batchWriteItemstate_task_service_dynamodb.py
SQSsendMessage (also with .waitForTaskToken)state_task_service_sqs.py:44
SNSpublishstate_task_service_sns.py:45
EventBridgePutEventsstate_task_service_events.py:43
ECSRunTask with .syncstate_task_service_ecs.py:42
Step Functions (nested)startExecution with .sync and .sync:2state_task_service_sfn.py:52
BatchSubmitJob, TerminateJob, DescribeJobs with .syncstate_task_service_batch.py:62
GlueStartJobRun with .syncstate_task_service_glue.py:81
API Gatewayinvoke via the HTTP Task extensionstate_task_service_api_gateway.py:108
AWS SDK integrationGeneric arn:aws:states:::aws-sdk:service:operation. Any boto3 op on any LocalEmu service.state_task_service_aws_sdk.py:36

Intrinsic functions

Eighteen functions available inside the dollar-string and Parameters substitution expressions. Implementations live under asl/component/intrinsic/function/statesfunction/.

FamilyFunctions
ArrayStates.Array, States.ArrayPartition, States.ArrayContains, States.ArrayRange, States.ArrayGetItem, States.ArrayLength, States.ArrayUnique
EncodingStates.Base64Encode, States.Base64Decode
HashingStates.Hash
JSONStates.JsonMerge, States.JsonToString, States.StringToJson
MathStates.MathRandom, States.MathAdd
StringStates.StringSplit, States.Format
Unique IDsStates.UUID

Error handling

Workflow types

TypeInvocationSource
Standard (default)StartExecution returns immediately; the execution runs asynchronously. Poll with DescribeExecution or stream events with GetExecutionHistory.backend/state_machine.py:69
ExpressStartSyncExecution blocks until the execution finishes and returns the final output in the same response. Created with --type EXPRESS.provider.py:944

Activity tasks (human-in-the-loop)

Activities are worker pools that pull tasks off an SFN-managed queue. The state machine sends a task through an Activity ARN; an external worker calls GetActivityTask to pick it up, then reports back via SendTaskSuccess, SendTaskFailure, or SendTaskHeartbeat.

Pause execution until an external approval (.waitForTaskToken)

The state machine drops the task token onto an SQS queue and waits. An external worker picks the token up, makes a decision, and calls SendTaskSuccess. The execution then resumes.

Terminal
# State machine with a .waitForTaskToken task. The task pauses until
# the worker calls SendTaskSuccess with the matching token.
$ cat > sm.json <<'EOF'
{
  "StartAt": "WaitForHuman",
  "States": {
    "WaitForHuman": {
      "Type":     "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl":    "http://sqs.us-east-1.localhost:4566/000000000000/approvals",
        "MessageBody": {
          "approval_id": "a-42",
          "token.$":     "$$.Task.Token"
        }
      },
      "End": true
    }
  }
}
EOF

$ awsemu sqs create-queue --queue-name approvals >/dev/null
$ SM_ARN=$(awsemu stepfunctions create-state-machine \
    --name approval-flow --definition file://sm.json \
    --role-arn arn:aws:iam::000000000000:role/sm-role \
    --query 'stateMachineArn' --output text)

$ EXEC=$(awsemu stepfunctions start-execution \
    --state-machine-arn "$SM_ARN" --input '{}' \
    --query 'executionArn' --output text)

# The execution is RUNNING. Pull the token off the queue and approve it.
$ TOKEN=$(awsemu sqs receive-message \
    --queue-url http://sqs.us-east-1.localhost:4566/000000000000/approvals \
    --query 'Messages[0].Body' --output text | python3 -c 'import sys,json; print(json.loads(sys.stdin.read())["token"])')

$ awsemu stepfunctions send-task-success \
    --task-token "$TOKEN" \
    --output '{"approved": true}'

$ awsemu stepfunctions describe-execution \
    --execution-arn "$EXEC" --query 'status' --output text
SUCCEEDED

Map state fan-out

Map iterates over the array at $.orders, runs the inline ItemProcessor workflow per item with at most three in parallel, and collects the per-item results into an array.

Terminal
# Map state iterates a list, runs the inline workflow for each item.
$ cat > sm.json <<'EOF'
{
  "StartAt": "FanOut",
  "States": {
    "FanOut": {
      "Type":         "Map",
      "ItemsPath":    "$.orders",
      "MaxConcurrency": 3,
      "ItemProcessor": {
        "StartAt": "Charge",
        "States": {
          "Charge": {
            "Type":   "Pass",
            "Result": "charged",
            "End":    true
          }
        }
      },
      "End": true
    }
  }
}
EOF

$ SM_ARN=$(awsemu stepfunctions create-state-machine \
    --name fan-out --definition file://sm.json \
    --role-arn arn:aws:iam::000000000000:role/sm-role \
    --query 'stateMachineArn' --output text)

$ EXEC=$(awsemu stepfunctions start-execution \
    --state-machine-arn "$SM_ARN" \
    --input '{"orders": [{"id":"o1"},{"id":"o2"},{"id":"o3"}]}' \
    --query 'executionArn' --output text)

$ awsemu stepfunctions describe-execution \
    --execution-arn "$EXEC" --query 'output' --output text
["charged","charged","charged"]

Configuration

One Step Functions-specific environment variable:

VariableDefaultPurpose
SFN_MOCK_CONFIG(unset)Path to a MockConfigFile.json for TestState. The file declares per-state mock responses so a state machine can be tested without invoking real downstream services.

Limits and defaults

LimitLocalEmuAWSSource
Max state machine name length80 chars80 charsprovider.py:262,274
Max activity name length80 chars80 charsprovider.py:262
Max state machine definition sizenot enforced1 MiBLocalEmu accepts larger.
Max execution history eventsnot enforced25 000LocalEmu does not truncate.
Max execution duration (Standard)not enforced1 yearLocalEmu does not time out.
Max execution duration (Express)not enforced5 minLocalEmu does not time out.

Known limitations