EventBridge
EventBridge is a custom Python implementation under the boto3 service name events. 41 of the 57 documented operations are implemented in LocalEmu's own code: the default event bus and any number of custom buses, rule patterns over arbitrary JSON with eleven matching operators including $or, sixteen target dispatchers, scheduled rules with rate() and cron() expressions, archives + replays, API destinations with four authentication modes, and CloudFormation resource providers for buses, rules, and bus policies.
S3 bucket notifications and CloudTrail data events publish into EventBridge automatically when their respective services are configured. The CloudTrail bridge runs synchronously inside the request handler that recorded the API call, so a rule that watches for aws.cloudtrail events fires before the originating call returns.
Operation-level coverage: see the EventBridge coverage matrix.
Quick start
Create an SQS queue as a target, create a rule on the default bus, publish an event, read the matching delivery from the queue.
$ awsemu sqs create-queue --queue-name orders
$ QARN=$(awsemu sqs get-queue-attributes \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/orders \
--attribute-names QueueArn --query 'Attributes.QueueArn' --output text)
$ awsemu events put-rule \
--name on-new-order \
--event-pattern '{"source":["myapp.orders"],"detail-type":["OrderCreated"]}'
$ awsemu events put-targets \
--rule on-new-order \
--targets "Id=1,Arn=$QARN"
$ awsemu events put-events --entries '[{
"Source": "myapp.orders",
"DetailType": "OrderCreated",
"Detail": "{\"order_id\":\"o-42\",\"total\":99}"
}]'
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/orders \
--query 'Messages[0].Body' --output text
{"version":"0","source":"myapp.orders","detail-type":"OrderCreated","detail":{"order_id":"o-42","total":99},...} Architecture
EventBridge lives at services/events/. The provider class EventsProvider (provider.py:239) routes each PutEvents entry through the rule engine and dispatches matching events to their targets.
- •Event ingest at
provider.py:1194:PutEventsvalidates the batch size (1-10 entries) and eachDetailsize (max 256 KiB), then hands the entries to_process_entries. - •Pattern engine at
services/events/event_rule_engine.py:9:EventRuleEngine.evaluate_pattern_on_event. The compiler flattens nested patterns and expands$orbranches into separate pattern variants so the matcher is straight dict comparison plus operator checks. - •Target dispatch at
services/events/target.py:821:TargetSenderFactoryextracts the AWS service from the target ARN and returns the matchingTargetSendersubclass. Each sender invokes the downstream service through a regular boto client. - •Scheduled rules at
services/events/scheduler.py: a singletonJobSchedulerwakes every ~60 seconds, evaluates each rule's cron expression against the current time, and fires matching rules. - •Archive and replay at
services/events/archive.py+replay.py: archives store every event matching their pattern in an in-memory dict; replays read the archive, filter by time range, and re-publish to a destination bus.
State lives in EventsStore (models.py:294): event buses, rules, targets, archives, replays, connections, API destinations. When PERSISTENCE=1 is set, the store is serialised and restored across restarts.
Cross-account event delivery encodes the source account and region in the X-Ray trace header (utils.py:249), so an event published from one account that targets another account's bus arrives with the original context preserved.
Event sources
Three paths put events on a LocalEmu EventBridge bus:
| Source | Path | File:line |
|---|---|---|
Direct PutEvents API | Custom or third-party producer calls PutEvents with the bus name in EventBusName. | services/events/provider.py:1194 |
| S3 bucket notifications | When a bucket has EventBridgeConfiguration={} in its notification config, every Object Created / Object Deleted event is published to the default bus with source=aws.s3. | services/s3/notifications.py:633-749 |
| CloudTrail recording hook | A request-handler hook published by CloudTrail forwards each recorded API call to EventBridge inside the same request thread. A rule watching for aws.cloudtrail events fires before the originating API call returns to the caller. | services/cloudtrail/recording_hook.py:100 |
Target dispatch
Sixteen target types are dispatched by services/events/target.py. Each sender invokes the downstream service through a regular boto client, so the target service handles its own authentication and request flow:
| Target type | Action | Source line |
|---|---|---|
| Lambda | Invoke | :693 |
| SQS | SendMessage | :757 |
| SNS | Publish | :752 |
| Kinesis | PutRecord | :668 |
| Firehose | PutRecord | :658 |
| ECS task | RunTask | :540 |
| Step Functions | StartExecution | :770 |
| API Gateway | HTTP request to the invoke URL | :387 |
| EventBridge bus (cross-bus) | PutEvents on the target bus | :566 |
| CloudWatch Logs | PutLogEvents | :703 |
| SSM Run Command | SendCommand | :787 |
| Batch | SubmitJob | :518 |
| Redshift | redshift-data ExecuteStatement | :721 |
| SageMaker | StartPipelineExecution | :739 |
| API destination (HTTP/HTTPS) | HTTP request with connection-derived auth headers | :609 |
| AppSync | Stub only. The dispatcher accepts the target but does not invoke the GraphQL endpoint. | :511 |
Per-target features that work for every sender: InputPath JSON-path extraction, InputTransformer template substitution, DeadLetterConfig for failed deliveries, and RetryPolicy.
Pattern matching
Eleven operators in services/events/event_rule_engine.py:
| Operator | Example | Line |
|---|---|---|
| Exact match | "source": ["myapp.orders"] | :64 |
| Prefix | [{"prefix": "us-"}] | :137 |
| Suffix | [{"suffix": ".com"}] | :141 |
| Equals ignore case | [{"equals-ignore-case": "OK"}] | :145 |
| Numeric range | [{"numeric": [">=", 100, "<", 1000]}] | :163 |
| Anything but | [{"anything-but": ["TEST"]}] | :71 |
| Exists | [{"exists": true}] and false | :66 |
| CIDR / IP address | [{"cidr": "10.0.0.0/8"}] | :149 |
Wildcard * | [{"wildcard": "order-*"}] | :157 |
$or (multiple branches) | {"$or": [{...}, {...}]} | :225 |
Nested detail.* path | {"detail": {"bucket": {"name": ["uploads"]}}} | :233 |
$ awsemu events put-rule --name high-value-or-fraud \
--event-pattern '{
"source": ["myapp.payments"],
"$or": [
{"detail": {"amount": [{"numeric": [">", 1000]}]}},
{"detail": {"risk_score": [{"numeric": [">=", 0.9]}]}},
{"detail": {"status": ["FLAGGED"]}}
]
}'
# Matches: amount=2000, risk_score=0.4 (first branch)
$ awsemu events put-events --entries '[{
"Source": "myapp.payments",
"DetailType": "PaymentProcessed",
"Detail": "{\"amount\":2000,\"risk_score\":0.4,\"status\":\"OK\"}"
}]'
# Matches: status=FLAGGED (third branch)
$ awsemu events put-events --entries '[{
"Source": "myapp.payments",
"DetailType": "PaymentProcessed",
"Detail": "{\"amount\":10,\"risk_score\":0.1,\"status\":\"FLAGGED\"}"
}]'
# Drops: none of the branches match
$ awsemu events put-events --entries '[{
"Source": "myapp.payments",
"DetailType": "PaymentProcessed",
"Detail": "{\"amount\":10,\"risk_score\":0.1,\"status\":\"OK\"}"
}]' S3 events into EventBridge
S3 publishes Object Created and Object Deleted events to the default bus when the bucket's notification configuration enables EventBridge. From there, ordinary EventBridge rules route them anywhere.
# Enable EventBridge notifications on an S3 bucket
$ awsemu s3 mb s3://uploads
$ awsemu s3api put-bucket-notification-configuration --bucket uploads \
--notification-configuration '{"EventBridgeConfiguration":{}}'
# A rule that matches every ObjectCreated event for the bucket
$ awsemu events put-rule \
--name on-upload \
--event-pattern '{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {"bucket":{"name":["uploads"]}}
}'
$ awsemu events put-targets --rule on-upload \
--targets "Id=1,Arn=$QARN"
$ echo hello | awsemu s3 cp - s3://uploads/file.txt
$ awsemu sqs receive-message --queue-url $QURL --query 'Messages[0].Body' --output text | head -c 200
{"version":"0","source":"aws.s3","detail-type":"Object Created","detail":{"bucket":{"name":"uploads"},"object":{"key":"file.txt", ...}}, ...} Scheduled rule firing a Lambda
Both rate(...) and cron(...) schedule expressions are honoured by the job scheduler. The scheduler ticks every 60 seconds, so a cron rule fires within a ~60 s window of the matching wall-clock time.
$ awsemu lambda create-function \
--function-name hourly-report \
--runtime python3.12 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler handler.handler \
--zip-file fileb://handler.zip
$ awsemu events put-rule \
--name hourly \
--schedule-expression 'cron(0 * * * ? *)'
$ awsemu events put-targets --rule hourly \
--targets "Id=1,Arn=arn:aws:lambda:us-east-1:000000000000:function:hourly-report"
# LocalEmu evaluates the cron expression on a ~60 s tick. When the
# schedule fires, the rule invokes the Lambda asynchronously.
$ awsemu logs tail /aws/lambda/hourly-report --follow Archives and replays
Archives capture every event that matches their pattern. Replays re-publish archived events through the bus, so the same rules fire again. State machine: STARTING, RUNNING, COMPLETED, CANCELLED (services/events/replay.py:20).
$ awsemu events create-archive \
--archive-name orders-archive \
--event-source-arn arn:aws:events:us-east-1:000000000000:event-bus/default \
--retention-days 7 \
--event-pattern '{"source":["myapp.orders"]}'
# Publish a few events. They land in subscribed targets AND in the archive.
$ for i in 1 2 3; do
awsemu events put-events --entries "[{\"Source\":\"myapp.orders\",\"DetailType\":\"OrderCreated\",\"Detail\":\"{\\\"order\\\":$i}\"}]"
done
$ awsemu events describe-archive --archive-name orders-archive --query 'EventCount' --output text
3
# Replay everything from the past hour back through the bus rules.
$ START=$(date -u -v-1H +%Y-%m-%dT%H:%M:%SZ 2>/dev/null || date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)
$ END=$(date -u +%Y-%m-%dT%H:%M:%SZ)
$ awsemu events start-replay \
--replay-name replay-1 \
--event-source-arn arn:aws:events:us-east-1:000000000000:archive/orders-archive \
--destination Arn=arn:aws:events:us-east-1:000000000000:event-bus/default \
--event-start-time "$START" \
--event-end-time "$END"
$ awsemu events describe-replay --replay-name replay-1 --query 'State' --output text
COMPLETED API destinations and connections
An API destination is a target type that POSTs the event to an HTTP/HTTPS endpoint outside AWS. The endpoint URL and HTTP method are on the destination; the authentication is on the linked connection. Four connection auth modes are accepted at services/events/connection.py:
- •API key: a header name + value injected on every request.
- •Basic: username and password sent in the
Authorization: Basic ...header. - •OAuth2 client credentials: client id, client secret, token endpoint. The dispatcher exchanges for a bearer token and caches it.
- •Invocation HTTP parameters: extra static headers, query parameters, or body parameters merged into every request.
Default invocation rate: 300 requests per second (models.py:282), matching the AWS default.
Limits and defaults
| Limit | LocalEmu | AWS | Source |
|---|---|---|---|
Max event Detail size | 256 KiB | 256 KiB | services/events/provider.py:217 |
Max PutEvents batch | 10 entries | 10 | services/events/provider.py:1203 |
| Target ID max length | 64 chars | 64 | services/events/provider.py:1011 |
| API destination invocation rate | 300 / s | 300 / s | services/events/models.py:282 |
| Cron scheduler tick | ~60 s | ~60 s | services/events/scheduler.py:68 |
| Max rules per event bus | not enforced | 300 | LocalEmu accepts more. |
| Max targets per rule | not enforced | 5 | LocalEmu accepts more. |
| Max event buses per account | not enforced | 100 | LocalEmu accepts more. |
Known limitations
- •Global Endpoints (multi-region resilience) are not implemented. The
EndpointIdparameter onPutEventsis accepted but ignored. Operations:CreateEndpoint,DescribeEndpoint,UpdateEndpoint,DeleteEndpoint,ListEndpoints. - •Partner / SaaS event sources are not implemented. There is no
aws.partner/*ingestion path and no admin API to create one. Nine operations are stubbed:CreatePartnerEventSource,DeletePartnerEventSource,DescribePartnerEventSource,ListPartnerEventSources,ListPartnerEventSourceAccounts,ActivateEventSource,DeactivateEventSource,DescribeEventSource,ListEventSources.PutPartnerEventsraisesNotImplementedException. - •
UpdateEventBusis not implemented. Buses are immutable after creation. To change a bus's dead-letter config or KMS key, delete and recreate. - •
DeauthorizeConnectionis not implemented. UseDeleteConnectionand recreate the connection to invalidate cached credentials. - •AppSync target type is a stub. An AppSync target accepts the rule binding but does not invoke the GraphQL endpoint (
target.py:511). - •KMS encryption for archives and connections is metadata only. The
kms_key_identifierparameter is stored but archive events and connection secrets are not encrypted at rest. - •Cron precision is ±60 s. The job scheduler ticks once per minute. A
cron(...)rule fires within a 60-second window of the matching wall-clock instant, not at the exact second. - •Replays do not honour the rule-ARN filter. A replay re-publishes archived events to the destination bus and re-applies every rule on that bus (
replay.py:82). You cannot scope a replay to a single rule. - •No async worker pool. A fired rule spawns a thread per delivery (
provider.py:2050); inside a singlePutEventsresponse, rules are processed sequentially before the response returns.