EventBridge Pipes
LocalEmu EventBridge Pipes is a greenfield implementation of all 10 operations: no moto delegation. CreatePipe with DesiredState=RUNNING immediately spawns a per-pipe poller thread; the thread reads records from the source, applies the configured EventBridge event-pattern filter, runs the JSONPath input template, and dispatches to the target via the EventBridge TargetSenderFactory. The MVP wires the SQS source plus six target services (Lambda, SQS, SNS, Step Functions, EventBridge bus, Kinesis), which is enough to test the canonical SQS-to-Lambda fan-out pattern end-to-end. Other sources and the enrichment step are not yet wired.
Operation-level coverage: see the EventBridge Pipes coverage matrix.
Quick start (SQS to Lambda)
# Source queue, target queue, and a Lambda that forwards messages.
$ SRC=$(awsemu sqs create-queue --queue-name pipe-src --query QueueUrl --output text)
$ SINK=$(awsemu sqs create-queue --queue-name pipe-sink --query QueueUrl --output text)
$ SRC_ARN=$(awsemu sqs get-queue-attributes --queue-url $SRC \
--attribute-names QueueArn --query Attributes.QueueArn --output text)
$ awsemu lambda create-function --function-name forward \
--runtime python3.12 --handler index.handler \
--role arn:aws:iam::000000000000:role/lambda-role \
--environment Variables='{SINK_URL='$SINK'}' \
--zip-file fileb://forward.zip >/dev/null
# Create the pipe in RUNNING state. The SQS poller starts immediately.
$ awsemu pipes create-pipe --name sqs-to-lambda \
--role-arn arn:aws:iam::000000000000:role/PipesRole \
--desired-state RUNNING \
--source $SRC_ARN \
--target arn:aws:lambda:us-east-1:000000000000:function:forward \
--query '{Name:Name,State:CurrentState}'
{"Name": "sqs-to-lambda", "State": "RUNNING"}
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":42}'
# The Lambda was invoked with the SQS-event envelope; the sink got the message.
$ awsemu sqs receive-message --queue-url $SINK --query 'Messages[0].Body'
"{\"order_id\":42}" DesiredState=RUNNING at create time starts the poller immediately. To create a paused pipe instead, pass --desired-state STOPPED and call StartPipe later.
Lifecycle
The state machine in pipe_worker.py:50-171:
CREATING → STOPPED → STARTING → RUNNING → STOPPING → STOPPED → DELETING The DesiredState on CreatePipe / UpdatePipe is translated to desired_running=True → worker.start(), or false → the worker stays STOPPED until StartPipe flips it.
$ awsemu pipes create-pipe --name idle \
--role-arn arn:aws:iam::000000000000:role/PipesRole \
--desired-state STOPPED \
--source $SRC_ARN \
--target arn:aws:lambda:us-east-1:000000000000:function:forward \
--query CurrentState --output text
STOPPED
$ awsemu pipes start-pipe --name idle
$ awsemu pipes describe-pipe --name idle --query CurrentState --output text
RUNNING
$ awsemu pipes stop-pipe --name idle
$ awsemu pipes describe-pipe --name idle --query CurrentState --output text
STOPPED Architecture
Code lives at services/pipes/. Entry point PipesProvider at provider.py:77, registered as pipes:default in plux.ini:93 via services/providers.py:488. Module layout:
| File | Role |
|---|---|
provider.py | API verbs, lifecycle hooks, persistence restart. |
pipe_manager.py | PipeManager singleton: dict[pipe_arn, PipeWorker] guarded by RLock. register() auto-stops a worker if the same ARN is re-registered. |
pipe_worker.py | Per-pipe state machine, the FuncThread that polls the source, shutdown event for graceful stop. |
pipe_worker_factory.py | Builds a PipeWorker from a moto Pipe record: detects the source service from the ARN, instantiates the matching poller, wires the processor and dispatcher. |
event_processor.py | Batch orchestration: filter, input template, target dispatch, partial-batch-failure handling. |
target.py | PipeTargetDispatcher: a thin wrapper that synthesizes an EventBridge-shaped Target dict and hands the event to the EventBridge TargetSenderFactory with ServicePrincipal="pipes". |
transform.py | InputTemplate evaluator: JSONPath <$.path> substitution plus the Pipes-only aliases <aws.pipes.event> and <aws.pipes.event.json>. |
Threading model: one FuncThread per pipe (pipe_worker.py:98-101). The polling loop reuses the Lambda Event Source Mapping machinery in services/lambda_/event_source_mapping/pollers/.
Sources
| Source | Status |
|---|---|
| SQS | Real. Reuses the Lambda ESM SqsPoller (services/lambda_/event_source_mapping/pollers/sqs_poller.py). Polling cadence and backoff come from LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC and the two backoff knobs. |
| Kinesis Stream | NotImplementedError at CreatePipe (pipe_worker_factory.py:85-87). |
| DynamoDB Streams | NotImplementedError. |
| Amazon MQ | NotImplementedError. |
| Amazon MSK + self-managed Kafka | NotImplementedError. |
Targets
Every target dispatch goes through the EventBridge TargetSenderFactory with ServicePrincipal="pipes", so role assumption and request-metadata propagation work the same way as EventBridge rule targets.
| Target | Dispatch shape |
|---|---|
| Lambda | Invoke with the rendered InputTemplate (or the raw source event if no template). |
| SQS | SendMessage with the rendered payload as MessageBody. |
| SNS | Publish with the payload as Message. |
| Step Functions | StartExecution with the payload as Input. |
| EventBridge bus | PutEvents with source=aws.pipes and the payload as Detail. |
| Kinesis | PutRecord with the payload as Data and the source ARN as PartitionKey. |
Other targets (ECS RunTask, API destination, custom HTTP, CloudWatch Logs, Firehose, Pipe-as-target, Redshift Data, Inspector, ...) raise NotImplementedError from the target sender factory.
Filter criteria
FilterCriteria.Filters[].Pattern uses the standard EventBridge event-pattern syntax. Filtering happens in Poller.filter_events() (pollers/poller.py:88-99) right after the source poll and before the input template. Records that fail the filter are dropped silently and the source's natural acknowledgement path handles them (SQS deletes the message; the next poll won't see it).
$ awsemu pipes update-pipe --name sqs-to-lambda \
--role-arn arn:aws:iam::000000000000:role/PipesRole \
--source-parameters '{
"FilterCriteria": {
"Filters": [{
"Pattern": "{\"body\":{\"order_id\":[{\"numeric\":[\">\",100]}]}}"
}]
}
}'
# Now only messages whose body has order_id > 100 reach the target.
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":42}'
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":999}'
$ awsemu sqs receive-message --queue-url $SINK --max-number-of-messages 5 \
--query 'Messages[].Body'
["{\"order_id\":999}"] Input template
The evaluator at transform.py:23-82 supports:
- •JSONPath substitution:
<$.path.into.event>. The value is JSON-rendered, so substituting an object inserts the full subtree. - •Pipes-only aliases:
<aws.pipes.event>(the full event as a string) and<aws.pipes.event.json>(the full event re-rendered as JSON). - •JSON-shape templates: when the template parses as JSON, the result is re-parsed so the target receives a real object, not a quoted string (
transform.py:67).
$ awsemu pipes update-pipe --name sqs-to-lambda \
--role-arn arn:aws:iam::000000000000:role/PipesRole \
--target-parameters '{
"InputTemplate": "{\"order\": <$.body.order_id>, \"queue\": \"<aws.pipes.event.json>\"}"
}'
# The Lambda now receives the rendered InputTemplate, not the raw SQS event.
$ awsemu sqs send-message --queue-url $SRC --message-body '{"order_id":999}'
$ awsemu logs filter-log-events \
--log-group-name /aws/lambda/forward --limit 1 --query 'events[0].message'
"Got: {\"order\": 999, \"queue\": \"{...full sqs event json...}\"}" Features supported
| Feature | Notes |
|---|---|
| CRUD | CreatePipe, UpdatePipe, DescribePipe, ListPipes, DeletePipe. |
| Lifecycle | StartPipe, StopPipe. State transitions enforced; double-start is idempotent. |
| Tags | TagResource, UntagResource, ListTagsForResource. |
| Filter criteria | Standard EventBridge event-pattern syntax (include + exclude). Evaluated in the source poller. |
| Input template | JSONPath substitution + Pipes aliases + JSON re-parse for object-shaped templates. |
| Partial-batch failures | Target failures yield a PartialBatchFailureError; failed items are not acknowledged so the source redelivers them. |
| Persistence | Moto pipe records survive PERSISTENCE=1 restart. _restart_persisted_pipes (provider.py:97-124) rebuilds a worker for every pipe whose DesiredState=RUNNING. Failed rebuilds set currentState=START_FAILED. |
Configuration
Pipes does not introduce its own environment variables. Polling cadence and backoff come from the shared Lambda Event Source Mapping knobs in config.py:
| Variable | Default | Purpose |
|---|---|---|
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC | 1.0 | Base poll interval for the SQS source. Lower it in tight tests. |
LAMBDA_EVENT_SOURCE_MAPPING_INITIAL_POLLER_BACKOFF_SEC | 2.0 | Initial backoff after an empty receive. |
LAMBDA_EVENT_SOURCE_MAPPING_MAX_POLLER_BACKOFF_SEC | 30.0 | Cap on exponential backoff. |
Integration points
| Service | How it touches Pipes |
|---|---|
| SQS | Source poller reuses the Lambda ESM SqsPoller; visibility timeout drives retry semantics. |
| EventBridge | Targets dispatch via the EventBridge TargetSenderFactory, so target wiring is shared with EventBridge rules. |
| Lambda | Target invocation; the function receives the rendered InputTemplate or the raw source event. |
| CloudFormation | AWS::Pipes::Pipe resource provider routes through this provider. |
End-to-end test scenarios
- •SQS source → Lambda target round-trip: create source + sink SQS queues, a Lambda that writes to the sink, a pipe with
DesiredState=RUNNING; send a message to the source, poll the sink, confirm delivery and uid preservation. - •Lifecycle sweep:
STOPPED-on-create does not dispatch;StartPipebringsSTOPPED → RUNNING;StopPipehalts dispatch;ListPipessurfaces the pipe; Kinesis source is rejected atCreatePipewith a clearNotImplementedError.
Known limitations
- •Only the SQS source is wired. Kinesis, DynamoDB Streams, Amazon MQ, Amazon MSK, and self-managed Kafka raise
NotImplementedErroratCreatePipe. The pipe is not created in any persisted form, so the error is the user's signal. - •Targets are limited to Lambda, SQS, SNS, Step Functions, EventBridge bus, and Kinesis. Anything else (ECS RunTask, API destinations, raw HTTP, CloudWatch Logs, Firehose, Pipe-as-target, Redshift Data, Inspector, ...) is rejected by the target sender factory.
- •Enrichment step is a stub. The hook exists at
event_processor.py:49-54but no Lambda / Step Functions / API destination / HTTP enricher is wired.EnrichmentParametersis accepted at the API but not invoked; events go straight from filter to target. - •
DeadLetterConfigis metadata-only. Failed dispatches do not produce DLQ messages. The DLQ envelope shape is synthesized but never sent (no SQS / SNS write). Source-side retries (SQS visibility timeout) are the only retry path. - •
MaximumRetryAttemptsandMaximumEventAgeInSecondsare not enforced. Retry behavior follows the source poller's policy. - •No pipe-execution logs.
PipeLogConfigurationParameters(target = CloudWatch Logs / S3 / Firehose) is accepted on the API but no diagnostic records are emitted. InspectDescribePipe.CurrentStateand the underlying target service's logs instead. - •
KmsKeyIdentifieris accepted but not enforced. Events in flight are not encrypted at rest under that key. - •No cross-account or cross-region pipes. Source, target, and pipe must all live in the same account and region.