Docs / Python (boto3)

Python (boto3)

Use boto3 with LocalEmu by setting endpoint_url to http://localhost:4566. Every boto3 operation works the same as it does with real AWS.

Helper Function

Create a reusable helper to avoid repeating endpoint configuration:

localemu_client.py
import boto3

LOCALEMU_ENDPOINT = "http://localhost:4566"

def get_client(service_name):
    """Create a boto3 client pointed at LocalEmu."""
    return boto3.client(
        service_name,
        endpoint_url=LOCALEMU_ENDPOINT,
        region_name="us-east-1",
        aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
        aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    )

def get_resource(service_name):
    """Create a boto3 resource pointed at LocalEmu."""
    return boto3.resource(
        service_name,
        endpoint_url=LOCALEMU_ENDPOINT,
        region_name="us-east-1",
        aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
        aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    )

# Usage:
s3 = get_client("s3")
dynamo = get_client("dynamodb")
sqs = get_client("sqs")
lam = get_client("lambda")

Direct Client Setup

If you prefer not to use a helper, configure each client directly:

Python
import boto3

s3 = boto3.client(
    "s3",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

S3: Create Bucket, Upload, Download, List

s3_example.py
import boto3

s3 = boto3.client(
    "s3",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

# Create a bucket
s3.create_bucket(Bucket="my-bucket")

# Upload a file
s3.put_object(
    Bucket="my-bucket",
    Key="data/report.csv",
    Body=b"id,name,score\n1,Alice,95\n2,Bob,87",
)

# Download a file
response = s3.get_object(Bucket="my-bucket", Key="data/report.csv")
content = response["Body"].read().decode("utf-8")
print(content)

# List all objects
objects = s3.list_objects_v2(Bucket="my-bucket")
for obj in objects.get("Contents", []):
    print(f"  {obj['Key']} ({obj['Size']} bytes)")

DynamoDB: Create Table, Put, Get, Scan

dynamodb_example.py
import boto3

dynamodb = boto3.client(
    "dynamodb",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

# Create a table
dynamodb.create_table(
    TableName="Products",
    KeySchema=[{"AttributeName": "productId", "KeyType": "HASH"}],
    AttributeDefinitions=[{"AttributeName": "productId", "AttributeType": "S"}],
    BillingMode="PAY_PER_REQUEST",
)

# Put an item
dynamodb.put_item(
    TableName="Products",
    Item={
        "productId": {"S": "p-001"},
        "name": {"S": "Mechanical Keyboard"},
        "price": {"N": "149.99"},
    },
)

# Get an item
result = dynamodb.get_item(
    TableName="Products",
    Key={"productId": {"S": "p-001"}},
)
print(result["Item"])

# Scan all items
scan = dynamodb.scan(TableName="Products")
for item in scan["Items"]:
    print(item)

SQS: Create Queue, Send, Receive

sqs_example.py
import json
import boto3

sqs = boto3.client(
    "sqs",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

# Create a queue
queue = sqs.create_queue(QueueName="tasks")
queue_url = queue["QueueUrl"]

# Send a message
sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps({"action": "send_email", "to": "user@example.com"}),
)

# Receive messages
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=5,
)

for msg in response.get("Messages", []):
    body = json.loads(msg["Body"])
    print(f"Received: {body}")

    # Delete after processing
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=msg["ReceiptHandle"],
    )

Lambda: Create and Invoke

lambda_example.py
import json
import zipfile
import io
import boto3

lambda_client = boto3.client(
    "lambda",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

# Create a zip with the handler code
handler_code = """
def handler(event, context):
    name = event.get("name", "World")
    return {"statusCode": 200, "body": f"Hello, {name}!"}
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
    zf.writestr("handler.py", handler_code)
zip_buffer.seek(0)

# Create the function
lambda_client.create_function(
    FunctionName="hello",
    Runtime="python3.12",
    Handler="handler.handler",
    Role="arn:aws:iam::000000000000:role/role",
    Code={"ZipFile": zip_buffer.read()},
)

# Invoke the function
response = lambda_client.invoke(
    FunctionName="hello",
    Payload=json.dumps({"name": "Tarek"}),
)

result = json.loads(response["Payload"].read())
print(result)
# {"statusCode": 200, "body": "Hello, Tarek!"}