Docs / EventBridge Pipes

EventBridge Pipes

LocalEmu EventBridge Pipes is a greenfield implementation of all 10 operations: no moto delegation. CreatePipe with DesiredState=RUNNING immediately spawns a per-pipe poller thread; the thread reads records from the source, applies the configured EventBridge event-pattern filter, runs the JSONPath input template, and dispatches to the target via the EventBridge TargetSenderFactory. The MVP wires the SQS source plus six target services (Lambda, SQS, SNS, Step Functions, EventBridge bus, Kinesis), which is enough to test the canonical SQS-to-Lambda fan-out pattern end-to-end. Other sources and the enrichment step are not yet wired.

Operation-level coverage: see the EventBridge Pipes coverage matrix.

Quick start (SQS to Lambda)

Terminal
# Source queue, target queue, and a Lambda that forwards messages.
$ SRC=$(awsemu sqs create-queue --queue-name pipe-src --query QueueUrl --output text)
$ SINK=$(awsemu sqs create-queue --queue-name pipe-sink --query QueueUrl --output text)
$ SRC_ARN=$(awsemu sqs get-queue-attributes --queue-url $SRC \
    --attribute-names QueueArn --query Attributes.QueueArn --output text)

$ awsemu lambda create-function --function-name forward \
    --runtime python3.12 --handler index.handler \
    --role arn:aws:iam::000000000000:role/lambda-role \
    --environment Variables='{SINK_URL='$SINK'}' \
    --zip-file fileb://forward.zip >/dev/null

# Create the pipe in RUNNING state. The SQS poller starts immediately.
$ awsemu pipes create-pipe --name sqs-to-lambda \
    --role-arn arn:aws:iam::000000000000:role/PipesRole \
    --desired-state RUNNING \
    --source $SRC_ARN \
    --target arn:aws:lambda:us-east-1:000000000000:function:forward \
    --query '{Name:Name,State:CurrentState}'
{"Name": "sqs-to-lambda", "State": "RUNNING"}

$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":42}'

# The Lambda was invoked with the SQS-event envelope; the sink got the message.
$ awsemu sqs receive-message --queue-url $SINK --query 'Messages[0].Body'
"{\"order_id\":42}"

DesiredState=RUNNING at create time starts the poller immediately. To create a paused pipe instead, pass --desired-state STOPPED and call StartPipe later.

Lifecycle

The state machine in pipe_worker.py:50-171:

CREATING → STOPPED → STARTING → RUNNING → STOPPING → STOPPED → DELETING

The DesiredState on CreatePipe / UpdatePipe is translated to desired_running=Trueworker.start(), or false → the worker stays STOPPED until StartPipe flips it.

Terminal
$ awsemu pipes create-pipe --name idle \
    --role-arn arn:aws:iam::000000000000:role/PipesRole \
    --desired-state STOPPED \
    --source $SRC_ARN \
    --target arn:aws:lambda:us-east-1:000000000000:function:forward \
    --query CurrentState --output text
STOPPED

$ awsemu pipes start-pipe --name idle
$ awsemu pipes describe-pipe --name idle --query CurrentState --output text
RUNNING

$ awsemu pipes stop-pipe --name idle
$ awsemu pipes describe-pipe --name idle --query CurrentState --output text
STOPPED

Architecture

Code lives at services/pipes/. Entry point PipesProvider at provider.py:77, registered as pipes:default in plux.ini:93 via services/providers.py:488. Module layout:

FileRole
provider.pyAPI verbs, lifecycle hooks, persistence restart.
pipe_manager.pyPipeManager singleton: dict[pipe_arn, PipeWorker] guarded by RLock. register() auto-stops a worker if the same ARN is re-registered.
pipe_worker.pyPer-pipe state machine, the FuncThread that polls the source, shutdown event for graceful stop.
pipe_worker_factory.pyBuilds a PipeWorker from a moto Pipe record: detects the source service from the ARN, instantiates the matching poller, wires the processor and dispatcher.
event_processor.pyBatch orchestration: filter, input template, target dispatch, partial-batch-failure handling.
target.pyPipeTargetDispatcher: a thin wrapper that synthesizes an EventBridge-shaped Target dict and hands the event to the EventBridge TargetSenderFactory with ServicePrincipal="pipes".
transform.pyInputTemplate evaluator: JSONPath <$.path> substitution plus the Pipes-only aliases <aws.pipes.event> and <aws.pipes.event.json>.

Threading model: one FuncThread per pipe (pipe_worker.py:98-101). The polling loop reuses the Lambda Event Source Mapping machinery in services/lambda_/event_source_mapping/pollers/.

Sources

SourceStatus
SQSReal. Reuses the Lambda ESM SqsPoller (services/lambda_/event_source_mapping/pollers/sqs_poller.py). Polling cadence and backoff come from LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC and the two backoff knobs.
Kinesis StreamNotImplementedError at CreatePipe (pipe_worker_factory.py:85-87).
DynamoDB StreamsNotImplementedError.
Amazon MQNotImplementedError.
Amazon MSK + self-managed KafkaNotImplementedError.

Targets

Every target dispatch goes through the EventBridge TargetSenderFactory with ServicePrincipal="pipes", so role assumption and request-metadata propagation work the same way as EventBridge rule targets.

TargetDispatch shape
LambdaInvoke with the rendered InputTemplate (or the raw source event if no template).
SQSSendMessage with the rendered payload as MessageBody.
SNSPublish with the payload as Message.
Step FunctionsStartExecution with the payload as Input.
EventBridge busPutEvents with source=aws.pipes and the payload as Detail.
KinesisPutRecord with the payload as Data and the source ARN as PartitionKey.

Other targets (ECS RunTask, API destination, custom HTTP, CloudWatch Logs, Firehose, Pipe-as-target, Redshift Data, Inspector, ...) raise NotImplementedError from the target sender factory.

Filter criteria

FilterCriteria.Filters[].Pattern uses the standard EventBridge event-pattern syntax. Filtering happens in Poller.filter_events() (pollers/poller.py:88-99) right after the source poll and before the input template. Records that fail the filter are dropped silently and the source's natural acknowledgement path handles them (SQS deletes the message; the next poll won't see it).

Terminal
$ awsemu pipes update-pipe --name sqs-to-lambda \
    --role-arn arn:aws:iam::000000000000:role/PipesRole \
    --source-parameters '{
      "FilterCriteria": {
        "Filters": [{
          "Pattern": "{\"body\":{\"order_id\":[{\"numeric\":[\">\",100]}]}}"
        }]
      }
    }'

# Now only messages whose body has order_id > 100 reach the target.
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":42}'
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":999}'

$ awsemu sqs receive-message --queue-url $SINK --max-number-of-messages 5 \
    --query 'Messages[].Body'
["{\"order_id\":999}"]

Input template

The evaluator at transform.py:23-82 supports:

Terminal
$ awsemu pipes update-pipe --name sqs-to-lambda \
    --role-arn arn:aws:iam::000000000000:role/PipesRole \
    --target-parameters '{
      "InputTemplate": "{\"order\": <$.body.order_id>, \"queue\": \"<aws.pipes.event.json>\"}"
    }'

# The Lambda now receives the rendered InputTemplate, not the raw SQS event.
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":999}'

$ awsemu logs filter-log-events \
    --log-group-name /aws/lambda/forward --limit 1 --query 'events[0].message'
"Got: {\"order\": 999, \"queue\": \"{...full sqs event json...}\"}"

Features supported

FeatureNotes
CRUDCreatePipe, UpdatePipe, DescribePipe, ListPipes, DeletePipe.
LifecycleStartPipe, StopPipe. State transitions enforced; double-start is idempotent.
TagsTagResource, UntagResource, ListTagsForResource.
Filter criteriaStandard EventBridge event-pattern syntax (include + exclude). Evaluated in the source poller.
Input templateJSONPath substitution + Pipes aliases + JSON re-parse for object-shaped templates.
Partial-batch failuresTarget failures yield a PartialBatchFailureError; failed items are not acknowledged so the source redelivers them.
PersistenceMoto pipe records survive PERSISTENCE=1 restart. _restart_persisted_pipes (provider.py:97-124) rebuilds a worker for every pipe whose DesiredState=RUNNING. Failed rebuilds set currentState=START_FAILED.

Configuration

Pipes does not introduce its own environment variables. Polling cadence and backoff come from the shared Lambda Event Source Mapping knobs in config.py:

VariableDefaultPurpose
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC1.0Base poll interval for the SQS source. Lower it in tight tests.
LAMBDA_EVENT_SOURCE_MAPPING_INITIAL_POLLER_BACKOFF_SEC2.0Initial backoff after an empty receive.
LAMBDA_EVENT_SOURCE_MAPPING_MAX_POLLER_BACKOFF_SEC30.0Cap on exponential backoff.

Integration points

ServiceHow it touches Pipes
SQSSource poller reuses the Lambda ESM SqsPoller; visibility timeout drives retry semantics.
EventBridgeTargets dispatch via the EventBridge TargetSenderFactory, so target wiring is shared with EventBridge rules.
LambdaTarget invocation; the function receives the rendered InputTemplate or the raw source event.
CloudFormationAWS::Pipes::Pipe resource provider routes through this provider.

End-to-end test scenarios

Known limitations