Event-Driven Microservices
Test distributed systems locally. Build SNS/SQS fan-out patterns, S3 event notifications, SQS-triggered Lambda pipelines, and service configuration with Secrets Manager, all on your machine.
The Problem
Event-driven microservices are the hardest AWS architectures to test. Each service communicates through messaging (SNS, SQS, EventBridge) and reacts to events asynchronously. Testing the full flow requires:
- All services running simultaneously
- Messaging infrastructure (topics, queues, subscriptions) fully configured
- Event source mappings connecting queues to Lambda functions
- Secrets and configuration available for each service
- A way to publish test events and verify they flow through the entire pipeline
On real AWS, standing this up every test run is impractical (queue + topic + subscription + event-source-mapping propagation alone is minutes), so teams end up sharing a dev account, stepping on each other's resources, and chasing ghost failures from leftover state.
The result: developers test individual services in isolation and hope the integrations work when deployed together. Bugs that only appear in the full flow are discovered in staging or production.
The Solution
LocalEmu lets you create the entire messaging topology locally in seconds. You can build, test, and verify complete event-driven architectures on your laptop before pushing to CI or deploying to AWS.
--> SQS (inventory-updates) --> Inventory Service
--> SQS (notification-dispatch) --> Notification Service
--> SQS (analytics-events) --> Analytics Service
Walkthrough: Mini E-Commerce System
Before exploring individual patterns, let us build a complete working example. This mini e-commerce system has three services that communicate through SNS and SQS:
Order Service
Accepts orders and publishes events to an SNS topic. Does not know about downstream consumers.
Payment Service
Reads from an SQS queue subscribed to the order topic. Records payment results in DynamoDB.
Notification Service
Reads from a separate SQS queue subscribed to the same topic. Sends order confirmations to customers.
--> SQS (notification-queue) --> Notification Service --> DynamoDB (notifications)
Step 1: Set up all resources
This setup script creates the SNS topic, both SQS queues, subscribes the queues to the topic, and creates the DynamoDB tables. Run it once after starting LocalEmu.
#!/bin/bash
# setup-ecommerce.sh - Create all resources for the mini e-commerce system.
set -euo pipefail
export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
export AWS_DEFAULT_REGION=us-east-1
echo "Creating SNS topic..."
awsemu sns create-topic --name order-events
echo "Creating SQS queues..."
awsemu sqs create-queue --queue-name payment-queue
awsemu sqs create-queue --queue-name notification-queue
echo "Subscribing queues to topic..."
awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:payment-queue
awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:notification-queue
echo "Creating DynamoDB tables..."
awsemu dynamodb create-table \
--table-name payments \
--key-schema AttributeName=order_id,KeyType=HASH \
--attribute-definitions AttributeName=order_id,AttributeType=S \
--billing-mode PAY_PER_REQUEST
awsemu dynamodb create-table \
--table-name notifications \
--key-schema AttributeName=order_id,KeyType=HASH \
--attribute-definitions AttributeName=order_id,AttributeType=S \
--billing-mode PAY_PER_REQUEST
echo "All resources created. Ready to test." Step 2: Order Service
The Order Service publishes order events to the SNS topic. It does not know or care which services consume the events. This is the decoupling that makes event-driven architectures powerful.
import boto3
import json
import os
def get_client(service):
kwargs = {}
endpoint = os.environ.get("AWS_ENDPOINT_URL")
if endpoint:
kwargs["endpoint_url"] = endpoint
return boto3.client(service, region_name="us-east-1", **kwargs)
def place_order(order_id, customer, items, total):
"""Publish an order event to SNS. All downstream services react to it."""
sns = get_client("sns")
message = json.dumps({
"order_id": order_id,
"customer": customer,
"items": items,
"total": total,
})
sns.publish(
TopicArn="arn:aws:sns:us-east-1:000000000000:order-events",
Message=message,
Subject="new-order",
)
return {"order_id": order_id, "status": "published"} Step 3: Payment Service
The Payment Service polls its SQS queue, parses the SNS envelope, extracts the order data, and records the payment in DynamoDB.
import boto3
import json
import os
def get_client(service):
kwargs = {}
endpoint = os.environ.get("AWS_ENDPOINT_URL")
if endpoint:
kwargs["endpoint_url"] = endpoint
return boto3.client(service, region_name="us-east-1", **kwargs)
def process_payments():
"""Poll the payment queue and record payment results in DynamoDB."""
sqs = get_client("sqs")
ddb = get_client("dynamodb")
queue_url = "http://sqs.us-east-1.localhost:4566/000000000000/payment-queue"
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=5)
for msg in resp.get("Messages", []):
# SNS wraps the body in an envelope
envelope = json.loads(msg["Body"])
order = json.loads(envelope.get("Message", envelope.get("Body", "{}")))
ddb.put_item(
TableName="payments",
Item={
"order_id": {"S": order["order_id"]},
"amount": {"N": str(order["total"])},
"status": {"S": "charged"},
},
)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
print(f"Payment processed for order {order['order_id']}") Step 4: Notification Service
The Notification Service works the same way but reads from a different queue and writes to a different table. In production this would send an email or push notification. For testing, it records what was sent.
import boto3
import json
import os
def get_client(service):
kwargs = {}
endpoint = os.environ.get("AWS_ENDPOINT_URL")
if endpoint:
kwargs["endpoint_url"] = endpoint
return boto3.client(service, region_name="us-east-1", **kwargs)
def send_notifications():
"""Poll the notification queue and store sent notifications in DynamoDB."""
sqs = get_client("sqs")
ddb = get_client("dynamodb")
queue_url = "http://sqs.us-east-1.localhost:4566/000000000000/notification-queue"
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=5)
for msg in resp.get("Messages", []):
envelope = json.loads(msg["Body"])
order = json.loads(envelope.get("Message", envelope.get("Body", "{}")))
ddb.put_item(
TableName="notifications",
Item={
"order_id": {"S": order["order_id"]},
"customer": {"S": order["customer"]},
"type": {"S": "order_confirmation"},
"status": {"S": "sent"},
},
)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
print(f"Notification sent for order {order['order_id']}") Step 5: Test the complete flow
This test publishes an order event and verifies that both downstream queues receive the message with the correct data. Run the setup script first, then run this test.
import os, sys, uuid, boto3, pytest
# Make src/ importable and point boto3 at LocalEmu.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
os.environ.setdefault("AWS_ENDPOINT_URL", "http://localhost:4566")
os.environ.setdefault("AWS_ACCESS_KEY_ID", "AKIAIOSFODNN7EXAMPLE")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
os.environ.setdefault("AWS_DEFAULT_REGION", "us-east-1")
from order_service import place_order
from payment_service import process_payments
from notification_service import send_notifications
KWARGS = dict(
endpoint_url="http://localhost:4566",
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region_name="us-east-1",
)
@pytest.fixture(scope="session")
def ddb():
return boto3.client("dynamodb", **KWARGS)
def test_order_flows_through_all_services(ddb):
order_id = f"ORD-{uuid.uuid4().hex[:8]}"
result = place_order(
order_id=order_id,
customer=f"{uuid.uuid4().hex[:6]}@example.com",
items=["widget-a", "widget-b"],
total=59.99,
)
assert result == {"order_id": order_id, "status": "published"}
# Drain both queues; long-poll briefly to absorb SNS fan-out latency.
payments = process_payments(wait_seconds=2)
notifications = send_notifications(wait_seconds=2)
assert payments >= 1
assert notifications >= 1
p = ddb.get_item(TableName="payments", Key={"order_id": {"S": order_id}}).get("Item")
assert p["status"]["S"] == "charged"
assert float(p["amount"]["N"]) == 59.99
n = ddb.get_item(TableName="notifications", Key={"order_id": {"S": order_id}}).get("Item")
assert n["type"]["S"] == "order_confirmation"
assert n["status"]["S"] == "sent" Step 6: Run it
$ ./scripts/setup.sh
→ creating SNS topic order-events...
→ creating SQS queues payment-queue + notification-queue...
→ subscribing both queues to the topic...
→ creating DynamoDB tables payments + notifications...
→ all resources ready.
$ ./scripts/test.sh
collected 1 item
tests/test_ecommerce.py::test_order_flows_through_all_services PASSED
============================== 1 passed in 0.17s ==============================
The full flow, SNS publish, fan-out to both queues, both services drain and write DynamoDB rows, asserted end-to-end, runs in about 170 ms once LocalEmu is warm. No AWS account, no credentials, no cost. The whole project (three services + pytest + setup/teardown scripts) lives in
10-microservices/
of the examples repo.
Pattern 1: SNS/SQS Fan-Out
The fan-out pattern is the foundation of event-driven architectures. A single event (published to an SNS topic) is delivered to multiple SQS queues, each consumed by a different downstream service. This decouples producers from consumers: the order service does not need to know about the payment, inventory, notification, or analytics services.
Set up the topology
# Create the SNS topic (the event bus)
$ awsemu sns create-topic --name order-events
TopicArn: arn:aws:sns:us-east-1:000000000000:order-events
# Create subscriber queues (one per downstream service)
$ awsemu sqs create-queue --queue-name payment-processing
$ awsemu sqs create-queue --queue-name inventory-updates
$ awsemu sqs create-queue --queue-name notification-dispatch
$ awsemu sqs create-queue --queue-name analytics-events
# Subscribe each queue to the topic
$ awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:payment-processing
$ awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:inventory-updates
$ awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:notification-dispatch
$ awsemu sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:us-east-1:000000000000:analytics-events Test the fan-out
# Publish an order event
$ awsemu sns publish \
--topic-arn arn:aws:sns:us-east-1:000000000000:order-events \
--message '{"orderId":"ORD-2026-001","amount":149.99,"items":3}'
# Verify all four queues received the message
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/payment-processing
Body: {"orderId":"ORD-2026-001","amount":149.99,"items":3}
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/inventory-updates
Body: {"orderId":"ORD-2026-001","amount":149.99,"items":3}
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/notification-dispatch
Body: {"orderId":"ORD-2026-001","amount":149.99,"items":3}
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/analytics-events
Body: {"orderId":"ORD-2026-001","amount":149.99,"items":3} One publish, four deliveries. You can verify that every downstream service receives the correct message without deploying anything to AWS.
Pattern 2: S3 Event Notifications
S3 event notifications trigger downstream processing when files are uploaded. This is commonly used for CSV import pipelines, image processing, log ingestion, and ETL jobs. The notification configuration supports suffix and prefix filters to route different file types to different queues.
# Create an S3 bucket and SQS queue
$ awsemu s3 mb s3://file-uploads
$ awsemu sqs create-queue --queue-name file-processing
# Configure S3 to send notifications to SQS on object creation
$ awsemu s3api put-bucket-notification-configuration \
--bucket file-uploads \
--notification-configuration '{
"QueueConfigurations": [{
"QueueArn": "arn:aws:sqs:us-east-1:000000000000:file-processing",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [{
"Name": "suffix",
"Value": ".csv"
}]
}
}
}]
}'
# Upload a CSV file
$ echo "name,email" > /tmp/users.csv
$ awsemu s3 cp /tmp/users.csv s3://file-uploads/imports/users.csv
# Check that SQS received the notification
$ awsemu sqs receive-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/file-processing
Body: {"Records":[{"s3":{"bucket":{"name":"file-uploads"},"object":{"key":"imports/users.csv"}}}]}
The filter ensures only .csv files trigger processing. Upload a .jpg and the queue stays empty. This lets you test your filter logic locally before deploying.
Pattern 3: SQS-Triggered Lambda Pipeline
Connect an SQS queue to a Lambda function via an event source mapping. Messages arriving in the queue automatically trigger the Lambda, which processes them and stores results in DynamoDB. This is the most common pattern for async job processing.
The Lambda function
import json
import boto3
import os
def handler(event, context):
"""Lambda triggered by SQS messages.
Processes each record and stores results in DynamoDB.
"""
endpoint = os.environ.get("AWS_ENDPOINT_URL")
kwargs = {"endpoint_url": endpoint} if endpoint else {}
dynamodb = boto3.resource("dynamodb", **kwargs)
table = dynamodb.Table(os.environ["RESULTS_TABLE"])
processed = 0
for record in event["Records"]:
body = json.loads(record["body"])
# Process the message (your business logic here)
result = {
"id": body.get("orderId", record["messageId"]),
"status": "processed",
"source_queue": record["eventSourceARN"].split(":")[-1],
"payload": body,
}
table.put_item(Item=result)
processed += 1
return {"processed": processed} Deploy and test the pipeline
# Create the results table
$ awsemu dynamodb create-table \
--table-name processing-results \
--key-schema AttributeName=id,KeyType=HASH \
--attribute-definitions AttributeName=id,AttributeType=S \
--billing-mode PAY_PER_REQUEST
# Create the processing queue
$ awsemu sqs create-queue --queue-name jobs
# Package and deploy the Lambda
$ zip processor.zip processor.py
$ awsemu lambda create-function \
--function-name queue-processor \
--runtime python3.12 \
--handler processor.handler \
--role arn:aws:iam::000000000000:role/role \
--zip-file fileb://processor.zip \
--environment 'Variables={RESULTS_TABLE=processing-results,AWS_ENDPOINT_URL=http://host.docker.internal:4566}'
# Connect the queue to the Lambda (event source mapping)
$ awsemu lambda create-event-source-mapping \
--function-name queue-processor \
--event-source-arn arn:aws:sqs:us-east-1:000000000000:jobs \
--batch-size 5
# Send test messages
$ awsemu sqs send-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/jobs \
--message-body '{"orderId":"ORD-001","task":"process-payment"}'
$ awsemu sqs send-message \
--queue-url http://sqs.us-east-1.localhost:4566/000000000000/jobs \
--message-body '{"orderId":"ORD-002","task":"send-confirmation"}'
# Wait a moment for processing, then verify results
$ awsemu dynamodb scan --table-name processing-results The event source mapping is the key piece: it connects the SQS queue to the Lambda function. LocalEmu handles the polling, batching, and invocation just like real AWS does.
Service Configuration with Secrets Manager
Microservices typically load configuration from Secrets Manager at startup. API keys, database credentials, feature flags, and service endpoints are stored as secrets and retrieved by each service. With LocalEmu, you can pre-populate secrets locally so your services start correctly without connecting to real AWS.
# Store configuration for the payment service
$ awsemu secretsmanager create-secret \
--name payment-service/config \
--secret-string '{"stripe_key":"sk_test_abc123","webhook_secret":"whsec_test_xyz","retry_limit":3}'
# Store configuration for the notification service
$ awsemu secretsmanager create-secret \
--name notification-service/config \
--secret-string '{"smtp_host":"smtp.example.com","smtp_port":587,"from_email":"orders@example.com"}'
# Store database credentials
$ awsemu secretsmanager create-secret \
--name shared/database \
--secret-string '{"host":"db.internal","port":5432,"username":"app","password":"dev-password-123"}'
# Retrieve a secret (as your service would at startup)
$ awsemu secretsmanager get-secret-value \
--secret-id payment-service/config
SecretString: {"stripe_key":"sk_test_abc123","webhook_secret":"whsec_test_xyz","retry_limit":3}
Use a naming convention like service-name/config to organize secrets by service. Your application code retrieves secrets the same way regardless of whether it talks to LocalEmu or real AWS.
End-to-End Flow Verification
The real power of local testing is verifying the complete flow: publish an event, verify it fans out to all subscribers, confirm Lambda processes it, and check that results are stored correctly. Here is a Python script that does exactly that:
import boto3
import json
import time
endpoint = "http://localhost:4566"
kwargs = dict(
endpoint_url=endpoint,
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region_name="us-east-1",
)
sns = boto3.client("sns", **kwargs)
sqs = boto3.client("sqs", **kwargs)
ddb = boto3.resource("dynamodb", **kwargs)
# 1. Publish an order event
print("Publishing order event...")
sns.publish(
TopicArn="arn:aws:sns:us-east-1:000000000000:order-events",
Message=json.dumps({
"orderId": "ORD-E2E-001",
"amount": 299.99,
"customer": "test-user",
}),
)
# 2. Verify fan-out: check each subscriber queue
queues = [
"payment-processing",
"inventory-updates",
"notification-dispatch",
"analytics-events",
]
time.sleep(2) # brief pause for message delivery
for queue_name in queues:
url = f"http://sqs.us-east-1.localhost:4566/000000000000/{queue_name}"
resp = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1)
msgs = resp.get("Messages", [])
assert len(msgs) == 1, f"Expected 1 message in {queue_name}, got {len(msgs)}"
print(f" {queue_name}: received message")
# 3. Verify downstream processing (if Lambda event source is configured)
time.sleep(3) # wait for Lambda processing
table = ddb.Table("processing-results")
result = table.get_item(Key={"id": "ORD-E2E-001"})
if "Item" in result:
print(f" DynamoDB: result stored with status={result['Item']['status']}")
else:
print(" DynamoDB: no result yet (Lambda may not be configured)")
print("\nEnd-to-end verification complete.") This script tests the entire pipeline in under 10 seconds. The same verification against real AWS would have to spin up the stack, wait for the topic + subscription + event-source-mapping to propagate, run the assertions, then tear everything down again before the next run.
Architecture Tips
Use dead-letter queues
Always pair your processing queues with DLQs. Test locally that failed messages are redirected correctly by deliberately throwing exceptions in your Lambda handler.
Test idempotency
SQS guarantees at-least-once delivery. Send the same message twice and verify your handler processes it correctly both times (or deduplicates it). This is easy to test locally, hard to simulate on real AWS.
Script your setup
Put all infrastructure creation commands in a seed script or Terraform configuration. This makes setup reproducible across team members and CI. See the Terraform guide for the full pattern.
Combine with Docker Compose
Run your microservices alongside LocalEmu in Docker Compose. Each service connects to http://localemu:4566 inside the Docker network. See the Local Development guide for the Docker Compose setup.