Deployment of AWS Step Functions with Lambda and CircleCI
Data Scientist

In this guide, you will build and deploy a serverless data processing workflow using AWS Step Functions and AWS Lambda. This approach enables you to orchestrate discrete processing tasks in a scalable and cost-efficient way, leveraging the event-driven architecture that AWS offers.
You will begin by creating individual Lambda functions that handle specific tasks in your data pipeline. These functions will be coordinated using AWS Step Functions, which allow you to define the workflow logic using a visual or JSON-based interface.
To ensure rapid, reliable, and repeatable deployments, you will automate your testing and deployment process using CircleCI. This continuous integration and delivery (CI/CD) platform will help you validate your code with unit tests, enforce code quality standards, and deploy your application to AWS automatically on every push to the main branch.
By the end of this guide, you will have a fully functioning CI/CD pipeline that deploys your AWS Step Functions workflow automatically, ensuring that your data processing pipeline is always up-to-date and reliable.
Prerequisites
Before you begin, ensure that you have the following requirements in place:
- Sign up for an AWS account if you do not already have one.
- Install the AWS Command Line Interface (CLI) and configure it with your AWS credentials. You can follow the AWS CLI setup guide.
- You should have basic familiarity with AWS Step Functions and how to create state machines.
- You will need a GitHub account for version control and a CircleCI account to automate your CI/CD pipeline.
- Install
uv
a fast and modern tool for managing dependencies and Python virtual environments. You will use this to set up your project environment in a later step.
Once you have completed these steps, you will be ready to begin setting up your project and implementing the serverless workflow.
Setting up the project
Before you start building, you need to set up your project environment, install the required dependencies, and understand the role AWS Step Functions will play in your application’s workflow.
Overview of AWS Step Functions
AWS Step Functions is a fully managed service that enables you to coordinate multiple distributed components in a defined order. Using state machines, you can model complex business logic, handle retries and failures, and monitor each step’s execution.
In this tutorial, you will use three AWS Lambda functions as tasks in a Step Functions state machine. These functions will be triggered by an object upload to an S3 bucket:
- The first Lambda function initiates the Step Functions workflow.
- The second extracts metadata such as the bucket name, file name, and file extension.
- The third classifies the file type based on its extension (
pdf
,doc
, orpng
).
Setting up the environment
Start by cloning the repository:
git clone https://github.com/CIRCLECI-GWP/aws-step-functions-circleci.git
cd aws-step-functions-circleci
Next, install the dependencies and initialize the virtual environment:
uv sync --all-groups
source .venv/bin/activate
These commands will:
- Install the dependencies defined in
pyproject.toml
. - Automatically create a virtual environment (
.venv
). - Activates the virtual environment.
The sync
command installs the main dependencies, and the --all-groups
flag ensures that optional dependency groups are included as well.
Finally, create an .env
file in the root directory of your repository and add the required environment variables. Feel free to rename the placeholders for the variables shown below and make sure you add your AWS_REGION
, AWS_ACCOUNT_ID
and the S3_BUCKET_NAME
once you create it in the next steps.
AWS_REGION=your_region
AWS_ACCOUNT_ID=your_account_id
S3_BUCKET_NAME=your_bucket_name
STATE_MACHINE_NAME=your_state_machine_name
SF_ROLE_NAME=your_sf_role_name
LAMBDA_FOLDER=your_lambda_folder
LAMBDA_ROLE_NAME=your_lambda_role_name
LAMBDA_FUNCTION_ONE=your_lambda_function_one
LAMBDA_FUNCTION_TWO=your_lambda_function_two
LAMBDA_FUNCTION_S3_TRIGGER=your_lambda_function_s3_trigger
These values are used throughout the deployment and configuration process to integrate your Lambda functions with AWS Step Functions and other AWS services such as S3 and IAM Roles.
Setting up the project structure
If you cloned the repository, you should already have the project structure in place. If you are setting it up manually, use the following structure as a guide. The dist
directory or any other name you choose for LAMBDA_FOLDER
in your .env
file is where the Lambda functions will be zipped and uploaded to AWS and it will be created automatically when you run the zip_lambdas.sh
script.
.
├── aws_lambdas/ # Lambda function implementations
│ ├── __init__.py
│ ├── s3_trigger/
│ │ ├── __init__.py
│ │ └── app_s3_trigger.py
│ ├── task_one/
│ │ ├── __init__.py
│ │ └── app_task_one.py
│ └── task_two/
│ ├── __init__.py
│ └── app_task_two.py
├── dist/ # Distribution directory for Lambda packages
│ ├── s3_trigger.zip
│ ├── task_one.zip
│ └── task_two.zip
├── scripts/ # Deployment and utility scripts
│ ├── create_roles.sh
│ └── zip_lambdas.sh
├── tests/ # Tests
│ ├── test_s3_trigger.py
│ ├── test_task_one.py
│ └── test_task_two.py
├── deploy.py # Main deployment script
├── LICENSE # Project license file
├── Makefile # Make commands for common tasks
├── pyproject.toml # Python project configuration
├── README.md # Project documentation
├── requirements.txt # Project dependencies for lambdas
├── state_machine_definition.json # Step Functions state machine definition
└── uv.lock # UV package manager lock file
Creating Lambda functions
In this section, you will implement three AWS Lambda functions, each serving a distinct role in the workflow orchestrated by AWS Step Functions. Before you create them, make sure you create a bucket using the AWS CLI with the following command, and update the S3_BUCKET_NAME
in your .env
file with the name of the bucket you receive after executing the command. The --create-bucket-configuration LocationConstraint
is only needed if your region is outside of us-east-1
.
aws s3api create-bucket \
--bucket step-functions-$(uuidgen | tr -d - | tr '[:upper:]' '[:lower:]' ) \
--region <your_aws_region> \
--create-bucket-configuration LocationConstraint=<your_aws_region>
S3 trigger Lambda
This Lambda is triggered by an object upload to an S3 bucket. Its job is to extract the bucket and file information from the S3 event and initiate a Step Functions execution using that information as input. You must create the file under app_s3_trigger.py
in the aws_lambdas/s3_trigger
directory.
Key behaviors:
- Extracts the bucket name and file key from the S3 event record.
- Builds an input payload for Step Functions.
- Calls
start_execution
on the Step Functions client. - Logs a success message including the
executionArn
. - Catches and logs any exceptions.
import json
import os
from typing import Any
import boto3 # type: ignore
import dotenv
from loguru import logger
dotenv.load_dotenv()
STATE_MACHINE_NAME = os.getenv("STATE_MACHINE_NAME")
AWS_REGION = os.getenv("AWS_REGION")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")
sf_client = boto3.client("stepfunctions", region_name=AWS_REGION)
def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Handle S3 event and trigger Step Functions execution."""
try:
# Get the S3 bucket and key from the event
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
# Prepare input for Step Functions
input_data = {
"document_id": key.split("/")[-1].split(".")[0], # Extract filename without extension
"s3_path": f"s3://{bucket}/{key}",
}
# Start Step Functions execution
response = sf_client.start_execution(
stateMachineArn=f"arn:aws:states:{AWS_REGION}:{AWS_ACCOUNT_ID}:stateMachine:{STATE_MACHINE_NAME}",
input=json.dumps(input_data),
)
logger.info(f"Started Step Functions execution: {response['executionArn']}")
return {"statusCode": 200, "body": "Step Functions execution started successfully"}
except Exception as e:
logger.error(f"Error processing S3 event: {e}")
raise
Task one Lambda: metadata extraction
This function runs as the first task in the state machine. It receives the s3_path
, extracts metadata such as the bucket name, file name, and file extension, and returns this information for the next task. You must create the file under app_task_one.py
in the aws_lambdas/task_one
directory.
Key behaviors:
- Parses the
s3_path
from the input event. - Extract the file name and extension.
- Constructs and return a dictionary with metadata:
bucket_name
,file_name
,file_extension
, ands3_path
. - Logs the metadata.
- Handles KeyErrors.
import os
from typing import Any
from loguru import logger
def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Handle Lambda function invocation."""
logger.info(f"Received event: {event}")
try:
object_key = event["s3_path"].split("://")[1]
bucket_name = object_key.split("/")[0]
# Extract file name and extension
filename = os.path.basename(object_key)
name, extension = os.path.splitext(filename)
metadata = {
"bucket_name": bucket_name,
"s3_path": object_key,
"file_name": name,
"file_extension": extension,
}
logger.info(f"Extracted metadata: {metadata}")
return metadata
except KeyError as e:
logger.error(f"Missing expected key: {e}")
raise
Task two Lambda: file classification
This is the second task in the state machine. It classifies the file based on its extension and adds a classification field to the output. You must create the file under app_task_two.py
in the aws_lambdas/task_two
directory.
Key behaviors:
- Extracts
document_id
andfile_extension
from the event. - Maps known extensions (
.pdf
,.doc
,.png
,) to document types. - Logs the classification.
- Handle and log unexpected exceptions.
from typing import Any
from loguru import logger
def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
try:
document_id = event.get("document_id")
file_extension = event.get("file_extension", "").lower()
# Classify based on file extension
if file_extension == ".pdf":
doc_type = "PDF Document"
elif file_extension == ".doc" or file_extension == ".docx":
doc_type = "Word Document"
elif file_extension == ".png" or file_extension == ".jpg" or file_extension == ".jpeg":
doc_type = "Image File"
else:
doc_type = "Unknown Type"
result = {
"document_id": document_id,
"classification": doc_type,
}
logger.info(f"Document {document_id} classified as: {doc_type}")
return result
except Exception as e:
logger.error(f"Error in classification: {e}")
raise
Creating the state machine
To define your Step Functions workflow, create a state machine definition in JSON format in the root directory under state_machine_definition.json
. This document specifies the tasks, their execution order, and how the output of one task flows into the next.
Key components of the definition:
- States: Logical steps in your workflow.
- Tasks: Each task invokes a specific AWS Lambda function and saves its output.
- Transitions: Define the flow between steps.
Next
: Specifies the next state to transition to. Pass the output of the current task to the next task as input.End
: Marks the end of the state machine.
The placeholders TASK_ONE_LAMBDA_ARN
and TASK_TWO_LAMBDA_ARN
will be replaced with the actual ARNs of your Lambda functions during deployment.
{
"Comment": "Simple two-step workflow triggered by S3 uploads",
"StartAt": "TaskOne",
"States": {
"TaskOne": {
"Type": "Task",
"Resource": "TASK_ONE_LAMBDA_ARN",
"ResultPath": "$.taskOneResult",
"Next": "TaskTwo"
},
"TaskTwo": {
"Type": "Task",
"Resource": "TASK_TWO_LAMBDA_ARN",
"InputPath": "$.taskOneResult",
"ResultPath": "$.taskTwoResult",
"End": true
}
}
}
Deploying the Step Functions
Deployment involves several coordinated steps to ensure all components such as the Lambda functions, IAM roles, and the state machine are properly set up.
Step 1: Create IAM roles
Create a create_roles.sh
script under the scripts
folder to create the necessary IAM roles. This script checks if the roles already exist and creates them if necessary. It also attaches the required policies to the roles.
- Lambda Execution Role: Allows Lambda to write logs and invoke Step Functions.
- Step Functions Role: Allows the state machine to invoke Lambda functions.
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -eu
# Load environment variables from .env file
set -o allexport
source .env
set +o allexport
STATE_MACHINE_ARN=arn:aws:states:${AWS_REGION}:${AWS_ACCOUNT_ID}:stateMachine:${STATE_MACHINE_NAME}
# Create Lambda Execution Role
if aws iam get-role --role-name $LAMBDA_ROLE_NAME 2>/dev/null; then
echo "✅ Lambda Role '$LAMBDA_ROLE_NAME' already exists."
else
echo "🚀 Creating Lambda Role '$LAMBDA_ROLE_NAME'..."
aws iam create-role --role-name $LAMBDA_ROLE_NAME \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": { "Service": "lambda.amazonaws.com" },
"Action": "sts:AssumeRole"
}]
}'
# Attach the AWSLambdaBasicExecutionRole policy
echo "🚀 Attaching AWSLambdaBasicExecutionRole policy to Lambda Role '$LAMBDA_ROLE_NAME'..."
aws iam attach-role-policy --role-name $LAMBDA_ROLE_NAME \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# Attach the custom policy to allow starting Step Functions executions
echo "🚀 Attaching custom policy to Lambda Role '$LAMBDA_ROLE_NAME'..."
aws iam put-role-policy --role-name $LAMBDA_ROLE_NAME \
--policy-name LambdaStartStepFunctionPolicy \
--policy-document "{
\"Version\": \"2012-10-17\",
\"Statement\": [{
\"Effect\": \"Allow\",
\"Action\": \"states:StartExecution\",
\"Resource\": \"${STATE_MACHINE_ARN}\"
}]
}"
fi
# Create Step Functions Execution Role
if aws iam get-role --role-name $SF_ROLE_NAME 2>/dev/null; then
echo "✅ Step Function Role '$SF_ROLE_NAME' already exists."
else
# Create the step function role
echo "🚀 Creating Step Function Role '$SF_ROLE_NAME'..."
aws iam create-role --role-name $SF_ROLE_NAME \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": { "Service": "states.amazonaws.com" },
"Action": "sts:AssumeRole"
}]
}'
# Attach the custom policy to allow invoking Lambda functions
echo "🚀 Attaching custom policy to Step Function Role '$SF_ROLE_NAME'..."
aws iam put-role-policy --role-name $SF_ROLE_NAME \
--policy-name StepFunctionLambdaInvokePolicy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "*"
}]
}'
fi
echo "🎯 All roles are ready!"
Run create_roles.sh
to generate the necessary IAM roles:
chmod +x ./scripts/create_roles.sh
./scripts/create_roles.sh
Step 2: Zip Lambda functions
Create a zip_lambdas.sh
script in the scripts
folder to bundle each Lambda function with its dependencies. Make sure to include all necessary dependencies in the requirements.txt
file. For the deployment, you will need the following dependencies:
boto3>=1.38.3
loguru>=0.7.3
python-dotenv>=1.1.0
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -eu
# Store the root directory path
ROOT_DIR=$(pwd)
# Load environment variables from .env file
set -o allexport
source .env
set +o allexport
echo "✅ Environment variables loaded."
# Create dist folder if it doesn't exist
mkdir -p "${ROOT_DIR}/${LAMBDA_FOLDER}"
echo "✅ Created ${LAMBDA_FOLDER} directory"
# Function to zip Lambda with dependencies
zip_lambda_with_deps() {
LAMBDA_NAME=$1
ZIP_FILE="${ROOT_DIR}/${LAMBDA_FOLDER}/${LAMBDA_NAME}.zip"
REQUIREMENTS_FILE="${ROOT_DIR}/requirements.txt"
echo "📦 Processing ${LAMBDA_NAME}..."
# Create package directory if it doesn't exist
PACKAGE_DIR="${ROOT_DIR}/aws_lambdas/${LAMBDA_NAME}/package"
mkdir -p "${PACKAGE_DIR}"
# Install dependencies
cd "${ROOT_DIR}/aws_lambdas/${LAMBDA_NAME}"
pip install --target ./package -r "${REQUIREMENTS_FILE}" --upgrade --no-cache-dir
# Create the zip file
cd package
zip -r9 "${ZIP_FILE}" .
cd ..
zip -g "${ZIP_FILE}" ./*.py
# Clean up package directory
rm -rf "${PACKAGE_DIR}"
# Return to root directory
cd "${ROOT_DIR}"
}
echo "🚀 Zipping Lambda functions..."
# Zip task_one
echo "📦 Zipping task_one..."
zip_lambda_with_deps "task_one"
# Zip task_two
echo "📦 Zipping task_two..."
zip_lambda_with_deps "task_two"
# Zip s3_trigger
echo "📦 Zipping s3_trigger..."
zip_lambda_with_deps "s3_trigger"
echo "✅ Lambda functions zipped successfully: ${LAMBDA_FOLDER}/task_one.zip, ${LAMBDA_FOLDER}/task_two.zip, ${LAMBDA_FOLDER}/s3_trigger.zip"
This will generate zipped packages under your defined LAMBDA_FOLDER
using the requirements.txt
file for dependencies
chmod +x ./scripts/zip_lambdas.sh
./scripts/zip_lambdas.sh
Step 3: Deploy Lambda functions and state machine
Finally, create a deploy.py
in the root directory to deploy the Lambda functions and the state machine. This script does the following:
- Create or update all three Lambda functions.
- Deploy the state machine with linked Lambda ARNs.
- Add permissions for S3 to invoke the Lambda functions.
- Attach an S3 event trigger to the S3-trigger Lambda function.
import json
import os
import time
from typing import Any
import boto3
import dotenv # type: ignore
from loguru import logger
dotenv.load_dotenv()
AWS_REGION = os.environ["AWS_REGION"]
AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
SF_ROLE_NAME = os.environ["SF_ROLE_NAME"]
LAMBDA_ROLE_NAME = os.environ["LAMBDA_ROLE_NAME"]
LAMBDA_FOLDER = os.environ["LAMBDA_FOLDER"]
LAMBDA_FUNCTION_ONE = os.environ["LAMBDA_FUNCTION_ONE"]
LAMBDA_FUNCTION_TWO = os.environ["LAMBDA_FUNCTION_TWO"]
LAMBDA_FUNCTION_S3_TRIGGER = os.environ["LAMBDA_FUNCTION_S3_TRIGGER"]
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
STATE_MACHINE_NAME = os.environ["STATE_MACHINE_NAME"]
s3_client = boto3.client("s3", region_name=AWS_REGION)
lambda_client = boto3.client("lambda", region_name=AWS_REGION)
sf_client = boto3.client("stepfunctions", region_name=AWS_REGION)
def create_lambda_function(name: str, zip_path: str, role_arn: str, handler: str) -> str:
"""Create or update a Lambda function."""
if not os.path.exists(zip_path):
raise FileNotFoundError(f"{zip_path} does not exist. Make sure to zip your code first.")
with open(zip_path, "rb") as f:
zipped_code = f.read()
# Environment variables for the Lambda function
env_vars = {
"STATE_MACHINE_NAME": STATE_MACHINE_NAME,
"AWS_ACCOUNT_ID": AWS_ACCOUNT_ID
}
try:
lambda_client.get_function(FunctionName=name)
logger.info(f"✅ Lambda function {name} already exists. Updating code...")
response = lambda_client.update_function_code(FunctionName=name, ZipFile=zipped_code)
# Update environment variables
lambda_client.update_function_configuration(
FunctionName=name,
Environment={"Variables": env_vars}
)
except lambda_client.exceptions.ResourceNotFoundException:
logger.info(f"🚀 Creating new Lambda function {name}...")
response = lambda_client.create_function(
FunctionName=name,
Runtime="python3.12",
Role=role_arn,
Handler=handler,
Code={"ZipFile": zipped_code},
Timeout=10,
Environment={"Variables": env_vars},
)
return str(response["FunctionArn"])
def deploy_state_machine(
def_path: str, role_arn: str, lambdas: dict[str, str], state_machine_name: str = str(STATE_MACHINE_NAME)
) -> dict[str, Any]:
with open(def_path) as f:
definition = json.load(f)
# Convert JSON object to string
definition_str = json.dumps(definition)
# Replace placeholders with real Lambda ARNs
definition_str = definition_str.replace("TASK_ONE_LAMBDA_ARN", lambdas["TaskOne"])
definition_str = definition_str.replace("TASK_TWO_LAMBDA_ARN", lambdas["TaskTwo"])
# Now deploy
existing_machines = sf_client.list_state_machines()["stateMachines"]
logger.info(f"Found {len(existing_machines)} existing state machines.")
logger.info(f"Existing machines: {existing_machines}")
state_machine_arn = None
for sm in existing_machines:
if sm["name"] == state_machine_name:
state_machine_arn = sm["stateMachineArn"]
break
if state_machine_arn:
logger.info(f"✅ State Machine {state_machine_name} already exists. Updating...")
response = sf_client.update_state_machine(
stateMachineArn=state_machine_arn, definition=definition_str, roleArn=role_arn
)
else:
logger.info(f"🚀 Creating new State Machine {state_machine_name}...")
response = sf_client.create_state_machine(
name=state_machine_name, definition=definition_str, roleArn=role_arn, type="STANDARD"
)
return dict(response)
def add_lambda_permission(function_name: str, bucket_name: str) -> None:
"""Add permission for S3 to invoke the Lambda function."""
try:
lambda_client.add_permission(
FunctionName=function_name,
StatementId=f"S3InvokeFunction-{bucket_name}", # Unique statement ID
Action="lambda:InvokeFunction",
Principal="s3.amazonaws.com",
SourceArn=f"arn:aws:s3:::{bucket_name}",
)
logger.info(f"Added S3 invoke permission to Lambda function {function_name}")
except lambda_client.exceptions.ResourceConflictException:
logger.info(f"Permission already exists for Lambda function {function_name}")
def add_s3_trigger_to_bucket(bucket_name: str, lambda_arn: str) -> None:
"""Configure S3 bucket to trigger Lambda on file upload."""
function_name = lambda_arn.split(":")[-1]
add_lambda_permission(function_name, bucket_name)
# Wait for a few seconds to ensure the permission is propagated
logger.info("Waiting for permission propagation...")
time.sleep(5)
try:
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"LambdaFunctionConfigurations": [{"LambdaFunctionArn": lambda_arn, "Events": ["s3:ObjectCreated:*"]}]
},
)
logger.info(f"Added S3 trigger configuration to bucket {bucket_name}")
except Exception as e:
logger.error(f"Error configuring S3 trigger: {str(e)}")
raise
if __name__ == "__main__":
logger.info("Starting deployment...")
lambda_role = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{LAMBDA_ROLE_NAME}"
sf_role = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{SF_ROLE_NAME}"
# Create or update all Lambda functions
lambdas = {
"TaskOne": create_lambda_function(
LAMBDA_FUNCTION_ONE,
f"{LAMBDA_FOLDER}/task_one.zip",
lambda_role,
"app_task_one.lambda_handler",
),
"TaskTwo": create_lambda_function(
LAMBDA_FUNCTION_TWO, f"{LAMBDA_FOLDER}/task_two.zip", lambda_role, "app_task_two.lambda_handler"
),
"S3Trigger": create_lambda_function(
LAMBDA_FUNCTION_S3_TRIGGER, f"{LAMBDA_FOLDER}/s3_trigger.zip", lambda_role, "app_s3_trigger.lambda_handler"
),
}
deploy_state_machine("state_machine_definition.json", sf_role, lambdas)
add_s3_trigger_to_bucket(S3_BUCKET_NAME, lambdas["S3Trigger"])
logger.info("Deployment complete!")
To deploy the Lambda functions and state machine, run the following command:
uv run deploy.py
Once the Lambda functions and the state machine are deployed you can find them in AWS under AWS Lambda and AWS Step Functions.
Writing tests for the application
Unit testing ensures that each Lambda function behaves as expected with valid and invalid inputs. You will use pytest
for testing the functions and create all test files under the tests
folder.
Testing the S3 trigger Lambda
To test the S3 trigger Lambda, you need to create an test_s3_trigger.py
file. This will mock the S3 event and check if the Step Functions execution is started correctly.
from typing import Any
import pytest
from aws_lambdas.s3_trigger.app_s3_trigger import lambda_handler
@pytest.fixture
def mock_env_vars(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("STATE_MACHINE_NAME", "TestStateMachine")
def test_lambda_handler_success(mock_env_vars: None) -> None:
# Arrange
test_event = {"Records": [{"s3": {"bucket": {"name": "test-bucket"}, "object": {"key": "folder/document.pdf"}}}]}
# Act
result = lambda_handler(test_event, None)
# Assert
assert result["statusCode"] == 200
assert "Step Functions execution started successfully" in result["body"]
def test_lambda_handler_invalid_event() -> None:
# Arrange
test_event: dict[str, Any] = {}
# Act & Assert
with pytest.raises(KeyError):
lambda_handler(test_event, None)
Testing the metadata extraction Lambda
To test the metadata extraction Lambda, you need to create an test_task_one.py
file. This will check if the function correctly extracts the metadata from the s3_path
.
import pytest
from aws_lambdas.task_one.app_task_one import lambda_handler
def test_lambda_handler_success() -> None:
# Arrange
test_event = {"document_id": "test123", "s3_path": "s3://test-bucket/folder/document.pdf"}
# Act
result = lambda_handler(test_event, None)
# Assert
assert result["bucket_name"] == "test-bucket"
assert result["file_name"] == "document"
assert result["file_extension"] == ".pdf"
assert result["s3_path"] == "test-bucket/folder/document.pdf"
def test_lambda_handler_missing_s3_path() -> None:
# Arrange
test_event = {"document_id": "test123"}
# Act & Assert
with pytest.raises(KeyError):
lambda_handler(test_event, None)
Testing the file classification Lambda
To test the file classification Lambda, you need to create an test_task_two.py
file. This will check if the function correctly classifies the file based on its extension.
from aws_lambdas.task_two.app_task_two import lambda_handler
def test_lambda_handler_pdf() -> None:
# Arrange
test_event = {"document_id": "test123", "file_extension": ".pdf"}
# Act
result = lambda_handler(test_event, None)
# Assert
assert result["document_id"] == "test123"
assert result["classification"] == "PDF Document"
def test_lambda_handler_image() -> None:
# Arrange
test_event = {"document_id": "test123", "file_extension": ".jpg"}
# Act
result = lambda_handler(test_event, None)
# Assert
assert result["document_id"] == "test123"
assert result["classification"] == "Image File"
def test_lambda_handler_unknown() -> None:
# Arrange
test_event = {"document_id": "test123", "file_extension": ".xyz"}
# Act
result = lambda_handler(test_event, None)
# Assert
assert result["document_id"] == "test123"
assert result["classification"] == "Unknown Type"
To run the tests you can use the following command:
uv run pytest
Deploying the application with CircleCI
To automate the deployment pipeline using CircleCI, you need to define a CircleCI config file (.circleci/config.yml
). This pipeline will handle tasks such as installing dependencies, validating your code, configuring AWS resources, and running tests.
version: 2.1
orbs:
aws-cli: circleci/aws-cli@5.3.1
jobs:
build-deploy:
docker:
- image: cimg/python:3.12
steps:
- checkout
- run:
name: Install UV
command: |
curl -LsSf https://astral.sh/uv/install.sh | sh
- run:
name: Create venv and install dependencies
command: |
uv sync --all-groups
- run:
name: Create .env file
command: |
echo "AWS_REGION=${AWS_REGION}" > .env
echo "AWS_ACCOUNT_ID=${AWS_ACCOUNT_ID}" >> .env
echo "S3_BUCKET_NAME=${S3_BUCKET_NAME}" >> .env
echo "STATE_MACHINE_NAME=${STATE_MACHINE_NAME}" >> .env
echo "SF_ROLE_NAME=${SF_ROLE_NAME}" >> .env
echo "LAMBDA_FOLDER=${LAMBDA_FOLDER}" >> .env
echo "LAMBDA_ROLE_NAME=${LAMBDA_ROLE_NAME}" >> .env
echo "LAMBDA_FUNCTION_ONE=${LAMBDA_FUNCTION_ONE}" >> .env
echo "LAMBDA_FUNCTION_TWO=${LAMBDA_FUNCTION_TWO}" >> .env
echo "LAMBDA_FUNCTION_S3_TRIGGER=${LAMBDA_FUNCTION_S3_TRIGGER}" >> .env
- run:
name: Run ruff
command: |
uv run ruff check . --fix --exit-non-zero-on-fix
- run:
name: Run MyPy
command: |
uv run mypy
- run:
name: Run tests
command: |
uv run pytest
- aws-cli/setup:
profile_name: default
- run:
name: Create Roles
command: |
./scripts/create_roles.sh
- run:
name: Zip Lambda functions
command: |
./scripts/zip_lambdas.sh
- run:
name: Deploy
command: |
uv run deploy.py
workflows:
deploy:
jobs:
- build-deploy
What this configuration does:
- Orbs: The aws-cli orb sets up the AWS CLI so you can interact with AWS services.
-
Job (
build-deploy
): This job includes 11 deployment steps.- Checks out your code.
- Installs
uv
for managing Python dependencies. - Syncs and installs dependencies from
pyproject.toml
. - Creates the
.env
file with the environment variables. - Runs
ruff
to automatically lint and fix Python code. - Runs
mypy
for static type checking. - Runs the test suite using
pytest
. - Sets up the AWS CLI environment.
- Runs the
create_roles.sh
script to create the necessary IAM roles. - Runs the
zip_lambdas.sh
script to zip the Lambda functions. - Runs the
deploy.py
script to deploy the Lambda functions and the state machine.
- Workflow (
deploy
) This workflow triggers thebuild-deploy
job every time you push new changes to your repository.
Once this configuration is committed and pushed to your GitHub repository, CircleCI will automatically kick off the deployment process.
However, before CircleCI can deploy your application to AWS, you must create a new project, and configure the required environment variables. Go to your CircleCI dashboard, create a new project, and link it to your GitHub repository. Then, navigate to your project settings, and add the same variables you defined in your .env
file and additionally the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
. These variables are needed for AWS authentication and for the scripts to run correctly.
Additionally, you need to create a pipeline in your project and set up a trigger for the pipeline. If you want to trigger the pipeline on each push to the main branch, you can set up an all pushes
event.
Once the environment variables, the pipeline, and the trigger are set up, you will be able to trigger the pipeline each time you push the code to your GitHub, manually.
Once you push, you will be able to monitor the steps directly in the CircleCI dashboard. If everything is set up correctly, you should see a green build.
To confirm that the deployment works as expected, upload a PDF to the configured S3 bucket. The Lambda function should be automatically triggered by the S3 event.
To upload a PDF to the S3 bucket, you can use the following command or use the AWS Console:
aws s3 cp /path/to/your/file.ext s3://your-bucket-name/
Then go to the AWS Step Functions console and check that the state machine has started a new execution. You should see the execution in the list of executions. Inside the execution, you should see the two tasks that were defined in the state machine, and by checking the input and output of each task, you can confirm that the Lambda functions are working as expected.
Cleaning up
If you do not need the app anymore, make sure you delete the resources to avoid unnecessary charges.
Conclusion
In this tutorial, you have learned how to deploy an AWS Step Functions workflow with Lambda functions and CircleCI. You have also learned how to test your code and automate the deployment process.
By combining AWS Step Functions, Lambda, and CircleCI, you have built a robust and automated serverless data processing pipeline. This architecture promotes scalability, fault tolerance, and clean separation of concerns between workflow logic and task execution. With CircleCI automating your testing and deployment, your infrastructure stays you are processing files from S3, orchestrating complex workflows, or scaling microservices, this setup provides a production-ready foundation for modern cloud applications.