Build an SNS fan-out with a FilterPolicy and iterate it on your laptop
You are a backend engineer on a team that just shipped an order-processing service. Other teams in the org want to react when an order is placed: the comms team wants every order so they can send a confirmation email, the fraud team wants only the big ones so they can run extra checks. Two teams, two consumers, one producer that does not want to know who is subscribed.
The boring AWS pattern for this is an SNS topic with two SQS
subscribers, each pulling into its own Lambda. The trick is the
SNS FilterPolicy:
the fraud queue subscribes with a rule that drops anything below
a threshold, so the fraud Lambda only ever sees the orders that
matter. Cheap to write, easy to get wrong: the filter syntax is
finicky, SQS access policies are mandatory but silent when
missing, and a misconfigured attribute type makes the whole
thing succeed-but-drop-everything.
This tutorial builds the pipeline on LocalEmu: 13 resources in a single CloudFormation stack, two integration tests, no AWS account. Deploy in about 30 seconds, publish one big order and one small one, watch them land in the right places, tear down in 2. The same template applies to real AWS with one argument change.
What you will have working at the end
Publish two SNS messages with different amounts. Inspect the DynamoDB sink. You should see:
- A. The big order
(
amount=500) lands in BOTH consumers. Two rows in DynamoDB, sameevent_id, one withservice=emails, one withservice=alerts. - B. The small order
(
amount=10) lands in the emails consumer only. The SNSFilterPolicydropped it from the alerts queue before it was ever delivered. One row in DynamoDB,service=emails. - C. The original JSON payload
survives every hop. The
payloadattribute on each DynamoDB row is the message the publisher sent, after SNS wrapped it, SQS re-wrapped it, and the Lambda un-wrapped both envelopes.
Architecture
publisher ──Publish──▶ sns: le-events-events │ ┌──────────────┴────────────────┐ ▼ ▼ sqs: le-events-emails sqs: le-events-alerts (no filter) (FilterPolicy: amount >= 100) │ │ ▼ ▼ lambda: emails-consumer lambda: alerts-consumer │ │ └───────────────┬───────────────┘ ▼ dynamodb: le-events-events composite key: (event_id, service)
1. The CloudFormation template, walked through
Thirteen resources in one
stack.yaml:
DynamoDB table, SNS topic, two SQS queues, two SNS subscriptions,
two SQS queue policies, one IAM role, two Lambda functions, two
Lambda event source mappings. The header sets up the parameters
and the DynamoDB sink with the composite
(event_id, service)
key, so one event lands as one row per consumer:
# cfn/stack.yaml: parameters + the DynamoDB sink that both consumers write to.
Parameters:
NamePrefix:
Type: String
Default: le-events
AlertThreshold:
Type: Number
Default: 100 # orders >= this go to the alerts queue too
Resources:
EventsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "${NamePrefix}-events"
BillingMode: PAY_PER_REQUEST
# Composite key: (event_id, service). One event ends up as one
# row PER consumer, so the scan tells you who saw what.
AttributeDefinitions:
- { AttributeName: event_id, AttributeType: S }
- { AttributeName: service, AttributeType: S }
KeySchema:
- { AttributeName: event_id, KeyType: HASH }
- { AttributeName: service, KeyType: RANGE }
EventsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub "${NamePrefix}-events"
The two SNS subscriptions are where the fan-out actually happens.
One subscribes the emails queue with no filter; the other
subscribes the alerts queue with a
FilterPolicy
that only accepts orders at or above the
AlertThreshold:
# cfn/stack.yaml: the two SQS subscriptions are where the fan-out happens.
# The emails queue gets every event. No filter.
EmailsSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref EventsTopic
Protocol: sqs
Endpoint: !GetAtt EmailsQueue.Arn
RawMessageDelivery: false
# The alerts queue only gets events where amount >= AlertThreshold.
# FilterPolicyScope: MessageAttributes means we match on attributes,
# NOT on the message body. Get the scope wrong and every message is
# silently filtered out.
AlertsSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref EventsTopic
Protocol: sqs
Endpoint: !GetAtt AlertsQueue.Arn
RawMessageDelivery: false
FilterPolicyScope: MessageAttributes
FilterPolicy:
amount:
- numeric: [">=", !Ref AlertThreshold]
# Both queues need an explicit policy allowing sns.amazonaws.com to publish.
# Real AWS requires this; LocalEmu does not enforce it but accepts it.
# Setting it unconditionally keeps the stack honest on both targets.
EmailsQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues: [!Ref EmailsQueue]
PolicyDocument:
Statement:
- Effect: Allow
Principal: { Service: sns.amazonaws.com }
Action: sqs:SendMessage
Resource: !GetAtt EmailsQueue.Arn
Condition: { ArnEquals: { aws:SourceArn: !Ref EventsTopic } }
Three details in those subscription blocks pay off later. The
FilterPolicyScope: MessageAttributes
line says "match on the publisher's message attributes, not on
the body". The
numeric: [">=", 100]
is the filter operator syntax; missing the outer list or sending
a string instead of a number silently drops every message. And
the unconditional
QueuePolicy
granting sns.amazonaws.com
publish rights is mandatory on real AWS: SNS will swallow your
messages without it.
Both consumer Lambdas use
Code: ZipFile
for inline Python. The handler is twelve lines: unwrap the SQS
envelope, unwrap the SNS envelope inside, write the row:
# cfn/stack.yaml: the consumer Lambda. Same code runs as both
# EmailsConsumer and AlertsConsumer; only SERVICE_NAME differs.
EmailsConsumer:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub "${NamePrefix}-emails-consumer"
Runtime: python3.12
# Inline ZipFile code is stored on disk as index.py by real AWS,
# so the Handler string MUST start with "index." or production fails.
Handler: index.handle
Role: !GetAtt ConsumerRole.Arn
Timeout: 15
Environment:
Variables:
TABLE_NAME: !Ref EventsTable
SERVICE_NAME: emails
Code:
ZipFile: |
import json, os, boto3
_table = boto3.resource("dynamodb").Table(os.environ["TABLE_NAME"])
SERVICE = os.environ["SERVICE_NAME"]
def handle(event, _ctx):
for record in event.get("Records", []):
body = json.loads(record["body"])
payload = json.loads(body["Message"])
_table.put_item(Item={
"event_id": payload["event_id"],
"service": SERVICE,
"payload": json.dumps(payload),
})
return {"processed": len(event.get("Records", []))}
Two important details in the Lambda definition. First, the
Handler string
must start with
index. when
using inline ZipFile
code, because real AWS stores that code in a file called
index.py
regardless of what you name it in the YAML. LocalEmu accepts
other names, which masks the rule, which is exactly the kind
of "works locally, fails in production" trap LocalEmu users
want to avoid; following AWS's rule on both targets is the
safer habit. Second, the two consumer Lambdas are byte-identical
Python code; only the
SERVICE_NAME
env var differs. The
service column
in DynamoDB is how the test asserts which consumer ran.
2. Run the scenario on your laptop
Clone the project, start LocalEmu in another terminal, then deploy:
$ cd localemu-examples/03-event-driven
$ localemu start # in a separate terminal
$ ./scripts/deploy.sh local
CloudFormation applies 13 resources in about 30 seconds. Most of the time is two Lambda containers cold-starting (LocalEmu pulls the python:3.12 runtime image the first time and reuses it after):
$ ./scripts/deploy.sh local
→ deploying CloudFormation stack 'le-events' to local
{
"StackId": "arn:aws:cloudformation:us-east-1:000000000000:stack/le-events/..."
}
waiting for stack-create-complete...
→ deployed to local. outputs:
AlertsConsumerName le-events-alerts-consumer
AlertsQueueArn arn:aws:sqs:us-east-1:000000000000:le-events-alerts
AlertsQueueUrl http://sqs.us-east-1.localhost:4566/000000000000/le-events-alerts
EmailsConsumerName le-events-emails-consumer
EmailsQueueArn arn:aws:sqs:us-east-1:000000000000:le-events-emails
EmailsQueueUrl http://sqs.us-east-1.localhost:4566/000000000000/le-events-emails
TableName le-events-events
TopicArn arn:aws:sns:us-east-1:000000000000:le-events-events
real 0m31.740s
Now publish the two scenarios from the top of this page and
watch the rows appear in DynamoDB. The
DataType=Number
on the message attribute is the load-bearing detail: send
DataType=String
with the same value and the
numeric filter
will silently drop everything.
$ # Publish a small order (amount=10). Filter rejects it from alerts.
$ aws --endpoint-url http://localhost:4566 sns publish \
--topic-arn arn:aws:sns:us-east-1:000000000000:le-events-events \
--message '{"event_id":"demo-small","amount":10,"customer":"alice"}' \
--message-attributes 'amount={DataType=Number,StringValue=10}'
{
"MessageId": "0d78fc59-2225-423e-8b6a-624858f2d7e6"
}
$ # Publish a big order (amount=500). Filter accepts it.
$ aws --endpoint-url http://localhost:4566 sns publish \
--topic-arn arn:aws:sns:us-east-1:000000000000:le-events-events \
--message '{"event_id":"demo-big","amount":500,"customer":"bob"}' \
--message-attributes 'amount={DataType=Number,StringValue=500}'
{
"MessageId": "8c84088d-0507-4f20-b6de-d6e527f3fc02"
}
$ # Wait a few seconds for delivery, then scan the DynamoDB sink.
$ sleep 5
$ aws --endpoint-url http://localhost:4566 dynamodb scan \
--table-name le-events-events --output table --query 'Items[*].[event_id.S,service.S]'
--------------------------
| Scan |
+--------------+----------+
| demo-big | alerts | <-- big order reached BOTH consumers
| demo-big | emails | <--
| demo-small | emails | <-- small order reached only the unfiltered queue
+--------------+----------+
# Three rows for two publishes. That is the filter policy doing its job.
The same two scenarios are baked into
tests/test_fanout.py
with explicit assertions on the row count per service and on
the round-tripped payload:
$ ./scripts/test.sh local
============================= test session starts ==============================
platform darwin -- Python 3.13.12, pytest-9.0.3
collected 2 items
tests/test_fanout.py::test_fanout_filter_policy PASSED
tests/test_fanout.py::test_payload_is_preserved PASSED
============================== 2 passed in 8.36s =============================== Tear the stack back down:
$ ./scripts/teardown.sh local
→ deleting stack 'le-events' on local
→ verifying teardown for prefix 'le-events' on local
clean: nothing left behind
real 0m2.092s
Deploy 32 seconds, tests 8 seconds, teardown 2 seconds. The
teardown script runs
cloudformation delete-stack,
waits for DELETE_COMPLETE,
then audits DynamoDB, SQS, and Lambda by prefix and exits
non-zero if anything survived. The same script handles the
aws target.
3. The same template on real AWS
Same scripts, aws
instead of local:
$ ./scripts/test.sh aws
$ ./scripts/teardown.sh aws
Three places where event-driven systems on real AWS behave differently from LocalEmu, all of which the template above already handles:
- • Inline-ZipFile handlers must
start with
index.Real AWS writes the inline code toindex.pyand an unmatched handler string crashes every invocation withRuntime.ImportModuleError. - • SQS queue policies are
mandatory for SNS delivery. Real AWS rejects the
delivery if the SQS queue does not grant
sns.amazonaws.comthe right to publish, and SNS logs nothing about it. The unconditional QueuePolicy in the template above is what keeps both targets honest. - • IAM propagation. A fresh IAM role takes a few seconds on real AWS before any Lambda can assume it; on LocalEmu it is instant. Built-in boto3 retries and CloudFormation's dependency ordering absorb this on the first apply.
The broader comparison lives in LocalEmu vs Real AWS and Known Limitations.
Get the full project
git clone https://github.com/localemu/localemu-examples
: the event-driven fan-out tutorial lives in
03-event-driven/
with the CloudFormation template, the two integration tests,
and the deploy / test / teardown scripts that produced every
terminal output on this page.