TutorialsSep 26, 202511 min read

Deployment of AWS Step Functions with Lambda and CircleCI

Benito Martin

Data Scientist

In this guide, you will build and deploy a serverless data processing workflow using AWS Step Functions and AWS Lambda. This approach enables you to orchestrate discrete processing tasks in a scalable and cost-efficient way, leveraging the event-driven architecture that AWS offers.

You will begin by creating individual Lambda functions that handle specific tasks in your data pipeline. These functions will be coordinated using AWS Step Functions, which allow you to define the workflow logic using a visual or JSON-based interface.

To ensure rapid, reliable, and repeatable deployments, you will automate your testing and deployment process using CircleCI. This continuous integration and delivery (CI/CD) platform will help you validate your code with unit tests, enforce code quality standards, and deploy your application to AWS automatically on every push to the main branch.

By the end of this guide, you will have a fully functioning CI/CD pipeline that deploys your AWS Step Functions workflow automatically, ensuring that your data processing pipeline is always up-to-date and reliable.

Prerequisites

Before you begin, ensure that you have the following requirements in place:

  • Sign up for an AWS account if you do not already have one.
  • Install the AWS Command Line Interface (CLI) and configure it with your AWS credentials. You can follow the AWS CLI setup guide.
  • You should have basic familiarity with AWS Step Functions and how to create state machines.
  • You will need a GitHub account for version control and a CircleCI account to automate your CI/CD pipeline.
  • Install uv a fast and modern tool for managing dependencies and Python virtual environments. You will use this to set up your project environment in a later step.

Once you have completed these steps, you will be ready to begin setting up your project and implementing the serverless workflow.

Setting up the project

Before you start building, you need to set up your project environment, install the required dependencies, and understand the role AWS Step Functions will play in your application’s workflow.

Overview of AWS Step Functions

AWS Step Functions is a fully managed service that enables you to coordinate multiple distributed components in a defined order. Using state machines, you can model complex business logic, handle retries and failures, and monitor each step’s execution.

In this tutorial, you will use three AWS Lambda functions as tasks in a Step Functions state machine. These functions will be triggered by an object upload to an S3 bucket:

  • The first Lambda function initiates the Step Functions workflow.
  • The second extracts metadata such as the bucket name, file name, and file extension.
  • The third classifies the file type based on its extension (pdf, doc, or png).

Setting up the environment

Start by cloning the repository:

git clone https://github.com/CIRCLECI-GWP/aws-step-functions-circleci.git
cd aws-step-functions-circleci

Next, install the dependencies and initialize the virtual environment:

uv sync --all-groups
source .venv/bin/activate

These commands will:

  • Install the dependencies defined in pyproject.toml.
  • Automatically create a virtual environment (.venv).
  • Activates the virtual environment.

The sync command installs the main dependencies, and the --all-groups flag ensures that optional dependency groups are included as well.

Finally, create an .env file in the root directory of your repository and add the required environment variables. Feel free to rename the placeholders for the variables shown below and make sure you add your AWS_REGION, AWS_ACCOUNT_ID and the S3_BUCKET_NAME once you create it in the next steps.

AWS_REGION=your_region
AWS_ACCOUNT_ID=your_account_id
S3_BUCKET_NAME=your_bucket_name
STATE_MACHINE_NAME=your_state_machine_name
SF_ROLE_NAME=your_sf_role_name
LAMBDA_FOLDER=your_lambda_folder
LAMBDA_ROLE_NAME=your_lambda_role_name
LAMBDA_FUNCTION_ONE=your_lambda_function_one
LAMBDA_FUNCTION_TWO=your_lambda_function_two
LAMBDA_FUNCTION_S3_TRIGGER=your_lambda_function_s3_trigger

These values are used throughout the deployment and configuration process to integrate your Lambda functions with AWS Step Functions and other AWS services such as S3 and IAM Roles.

Setting up the project structure

If you cloned the repository, you should already have the project structure in place. If you are setting it up manually, use the following structure as a guide. The dist directory or any other name you choose for LAMBDA_FOLDER in your .env file is where the Lambda functions will be zipped and uploaded to AWS and it will be created automatically when you run the zip_lambdas.sh script.

.
├── aws_lambdas/                      # Lambda function implementations
│   ├── __init__.py                   
│   ├── s3_trigger/                   
│   │   ├── __init__.py               
│   │   └── app_s3_trigger.py         
│   ├── task_one/                     
│   │   ├── __init__.py               
│   │   └── app_task_one.py           
│   └── task_two/                     
│       ├── __init__.py               
│       └── app_task_two.py           
├── dist/                             # Distribution directory for Lambda packages
│   ├── s3_trigger.zip              
│   ├── task_one.zip                  
│   └── task_two.zip                  
├── scripts/                          # Deployment and utility scripts
│   ├── create_roles.sh               
│   └── zip_lambdas.sh               
├── tests/                            # Tests
│   ├── test_s3_trigger.py           
│   ├── test_task_one.py             
│   └── test_task_two.py              
├── deploy.py                         # Main deployment script
├── LICENSE                           # Project license file
├── Makefile                          # Make commands for common tasks
├── pyproject.toml                    # Python project configuration
├── README.md                         # Project documentation
├── requirements.txt                  # Project dependencies for lambdas
├── state_machine_definition.json     # Step Functions state machine definition
└── uv.lock                           # UV package manager lock file

Creating Lambda functions

In this section, you will implement three AWS Lambda functions, each serving a distinct role in the workflow orchestrated by AWS Step Functions. Before you create them, make sure you create a bucket using the AWS CLI with the following command, and update the S3_BUCKET_NAME in your .env file with the name of the bucket you receive after executing the command. The --create-bucket-configuration LocationConstraint is only needed if your region is outside of us-east-1.

aws s3api create-bucket \
    --bucket step-functions-$(uuidgen | tr -d - | tr '[:upper:]' '[:lower:]' ) \
    --region <your_aws_region> \
    --create-bucket-configuration LocationConstraint=<your_aws_region>

S3 trigger Lambda

This Lambda is triggered by an object upload to an S3 bucket. Its job is to extract the bucket and file information from the S3 event and initiate a Step Functions execution using that information as input. You must create the file under app_s3_trigger.py in the aws_lambdas/s3_trigger directory.

Key behaviors:

  • Extracts the bucket name and file key from the S3 event record.
  • Builds an input payload for Step Functions.
  • Calls start_execution on the Step Functions client.
  • Logs a success message including the executionArn.
  • Catches and logs any exceptions.
import json
import os
from typing import Any

import boto3  # type: ignore
import dotenv
from loguru import logger

dotenv.load_dotenv()

STATE_MACHINE_NAME = os.getenv("STATE_MACHINE_NAME")
AWS_REGION = os.getenv("AWS_REGION")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")

sf_client = boto3.client("stepfunctions", region_name=AWS_REGION)

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Handle S3 event and trigger Step Functions execution."""
    try:
        # Get the S3 bucket and key from the event
        bucket = event["Records"][0]["s3"]["bucket"]["name"]
        key = event["Records"][0]["s3"]["object"]["key"]

        # Prepare input for Step Functions
        input_data = {
            "document_id": key.split("/")[-1].split(".")[0],  # Extract filename without extension
            "s3_path": f"s3://{bucket}/{key}",
        }

        # Start Step Functions execution
        response = sf_client.start_execution(
            stateMachineArn=f"arn:aws:states:{AWS_REGION}:{AWS_ACCOUNT_ID}:stateMachine:{STATE_MACHINE_NAME}",
            input=json.dumps(input_data),
        )

        logger.info(f"Started Step Functions execution: {response['executionArn']}")
        return {"statusCode": 200, "body": "Step Functions execution started successfully"}

    except Exception as e:
        logger.error(f"Error processing S3 event: {e}")
        raise

Task one Lambda: metadata extraction

This function runs as the first task in the state machine. It receives the s3_path, extracts metadata such as the bucket name, file name, and file extension, and returns this information for the next task. You must create the file under app_task_one.py in the aws_lambdas/task_one directory.

Key behaviors:

  • Parses the s3_path from the input event.
  • Extract the file name and extension.
  • Constructs and return a dictionary with metadata: bucket_name, file_name, file_extension, and s3_path.
  • Logs the metadata.
  • Handles KeyErrors.
import os
from typing import Any

from loguru import logger

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Handle Lambda function invocation."""
    logger.info(f"Received event: {event}")

    try:
        object_key = event["s3_path"].split("://")[1]
        bucket_name = object_key.split("/")[0]

        # Extract file name and extension
        filename = os.path.basename(object_key)
        name, extension = os.path.splitext(filename)

        metadata = {
            "bucket_name": bucket_name,
            "s3_path": object_key,
            "file_name": name,
            "file_extension": extension,
        }

        logger.info(f"Extracted metadata: {metadata}")
        return metadata

    except KeyError as e:
        logger.error(f"Missing expected key: {e}")
        raise

Task two Lambda: file classification

This is the second task in the state machine. It classifies the file based on its extension and adds a classification field to the output. You must create the file under app_task_two.py in the aws_lambdas/task_two directory.

Key behaviors:

  • Extracts document_id and file_extension from the event.
  • Maps known extensions (.pdf, .doc, .png,) to document types.
  • Logs the classification.
  • Handle and log unexpected exceptions.
from typing import Any

from loguru import logger

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    try:
        document_id = event.get("document_id")
        file_extension = event.get("file_extension", "").lower()

        # Classify based on file extension
        if file_extension == ".pdf":
            doc_type = "PDF Document"
        elif file_extension == ".doc" or file_extension == ".docx":
            doc_type = "Word Document"
        elif file_extension == ".png" or file_extension == ".jpg" or file_extension == ".jpeg":
            doc_type = "Image File"
        else:
            doc_type = "Unknown Type"

        result = {
            "document_id": document_id,
            "classification": doc_type,
        }

        logger.info(f"Document {document_id} classified as: {doc_type}")
        return result

    except Exception as e:
        logger.error(f"Error in classification: {e}")
        raise

Creating the state machine

To define your Step Functions workflow, create a state machine definition in JSON format in the root directory under state_machine_definition.json. This document specifies the tasks, their execution order, and how the output of one task flows into the next.

Key components of the definition:

  • States: Logical steps in your workflow.
  • Tasks: Each task invokes a specific AWS Lambda function and saves its output.
  • Transitions: Define the flow between steps.
    • Next: Specifies the next state to transition to. Pass the output of the current task to the next task as input.
    • End: Marks the end of the state machine.

The placeholders TASK_ONE_LAMBDA_ARN and TASK_TWO_LAMBDA_ARN will be replaced with the actual ARNs of your Lambda functions during deployment.

{
    "Comment": "Simple two-step workflow triggered by S3 uploads",
    "StartAt": "TaskOne",
    "States": {
        "TaskOne": {
            "Type": "Task",
            "Resource": "TASK_ONE_LAMBDA_ARN",
            "ResultPath": "$.taskOneResult",
            "Next": "TaskTwo"
        },
        "TaskTwo": {
            "Type": "Task",
            "Resource": "TASK_TWO_LAMBDA_ARN",
            "InputPath": "$.taskOneResult",
            "ResultPath": "$.taskTwoResult",
            "End": true
        }
    }
}

Deploying the Step Functions

Deployment involves several coordinated steps to ensure all components such as the Lambda functions, IAM roles, and the state machine are properly set up.

Step 1: Create IAM roles

Create a create_roles.sh script under the scripts folder to create the necessary IAM roles. This script checks if the roles already exist and creates them if necessary. It also attaches the required policies to the roles.

  • Lambda Execution Role: Allows Lambda to write logs and invoke Step Functions.
  • Step Functions Role: Allows the state machine to invoke Lambda functions.
#!/bin/bash

# Exit immediately if a command exits with a non-zero status
set -eu

# Load environment variables from .env file
set -o allexport
source .env
set +o allexport

STATE_MACHINE_ARN=arn:aws:states:${AWS_REGION}:${AWS_ACCOUNT_ID}:stateMachine:${STATE_MACHINE_NAME}

# Create Lambda Execution Role
if aws iam get-role --role-name $LAMBDA_ROLE_NAME 2>/dev/null; then
  echo "✅ Lambda Role '$LAMBDA_ROLE_NAME' already exists."
else
  echo "🚀 Creating Lambda Role '$LAMBDA_ROLE_NAME'..."
  aws iam create-role --role-name $LAMBDA_ROLE_NAME \
    --assume-role-policy-document '{
      "Version": "2012-10-17",
      "Statement": [{
        "Effect": "Allow",
        "Principal": { "Service": "lambda.amazonaws.com" },
        "Action": "sts:AssumeRole"
      }]
    }'

# Attach the AWSLambdaBasicExecutionRole policy
  echo "🚀 Attaching AWSLambdaBasicExecutionRole policy to Lambda Role '$LAMBDA_ROLE_NAME'..."
  aws iam attach-role-policy --role-name $LAMBDA_ROLE_NAME \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  # Attach the custom policy to allow starting Step Functions executions
  echo "🚀 Attaching custom policy to Lambda Role '$LAMBDA_ROLE_NAME'..."
  aws iam put-role-policy --role-name $LAMBDA_ROLE_NAME \
    --policy-name LambdaStartStepFunctionPolicy \
    --policy-document "{
      \"Version\": \"2012-10-17\",
      \"Statement\": [{
        \"Effect\": \"Allow\",
        \"Action\": \"states:StartExecution\",
        \"Resource\": \"${STATE_MACHINE_ARN}\"
      }]
    }"

fi

# Create Step Functions Execution Role
if aws iam get-role --role-name $SF_ROLE_NAME 2>/dev/null; then
  echo "✅ Step Function Role '$SF_ROLE_NAME' already exists."
else
  # Create the step function role
  echo "🚀 Creating Step Function Role '$SF_ROLE_NAME'..."
  aws iam create-role --role-name $SF_ROLE_NAME \
    --assume-role-policy-document '{
      "Version": "2012-10-17",
      "Statement": [{
        "Effect": "Allow",
        "Principal": { "Service": "states.amazonaws.com" },
        "Action": "sts:AssumeRole"
      }]
    }'

  # Attach the custom policy to allow invoking Lambda functions
  echo "🚀 Attaching custom policy to Step Function Role '$SF_ROLE_NAME'..."
  aws iam put-role-policy --role-name $SF_ROLE_NAME \
    --policy-name StepFunctionLambdaInvokePolicy \
    --policy-document '{
      "Version": "2012-10-17",
      "Statement": [{
        "Effect": "Allow",
        "Action": "lambda:InvokeFunction",
        "Resource": "*"
      }]
    }'
fi

echo "🎯 All roles are ready!"

Run create_roles.sh to generate the necessary IAM roles:

chmod +x ./scripts/create_roles.sh
./scripts/create_roles.sh

Create roles

Step 2: Zip Lambda functions

Create a zip_lambdas.sh script in the scripts folder to bundle each Lambda function with its dependencies. Make sure to include all necessary dependencies in the requirements.txt file. For the deployment, you will need the following dependencies:

boto3>=1.38.3
loguru>=0.7.3
python-dotenv>=1.1.0
#!/bin/bash

# Exit immediately if a command exits with a non-zero status
set -eu

# Store the root directory path
ROOT_DIR=$(pwd)

# Load environment variables from .env file
set -o allexport
source .env
set +o allexport

echo "✅ Environment variables loaded."

# Create dist folder if it doesn't exist
mkdir -p "${ROOT_DIR}/${LAMBDA_FOLDER}"
echo "✅ Created ${LAMBDA_FOLDER} directory"

# Function to zip Lambda with dependencies
zip_lambda_with_deps() {
    LAMBDA_NAME=$1
    ZIP_FILE="${ROOT_DIR}/${LAMBDA_FOLDER}/${LAMBDA_NAME}.zip"
    REQUIREMENTS_FILE="${ROOT_DIR}/requirements.txt"

    echo "📦 Processing ${LAMBDA_NAME}..."

    # Create package directory if it doesn't exist
    PACKAGE_DIR="${ROOT_DIR}/aws_lambdas/${LAMBDA_NAME}/package"
    mkdir -p "${PACKAGE_DIR}"

    # Install dependencies
    cd "${ROOT_DIR}/aws_lambdas/${LAMBDA_NAME}"
    pip install --target ./package -r "${REQUIREMENTS_FILE}" --upgrade --no-cache-dir

    # Create the zip file
    cd package
    zip -r9 "${ZIP_FILE}" .
    cd ..
    zip -g "${ZIP_FILE}" ./*.py

    # Clean up package directory
    rm -rf "${PACKAGE_DIR}"

    # Return to root directory
    cd "${ROOT_DIR}"
}

echo "🚀 Zipping Lambda functions..."

# Zip task_one
echo "📦 Zipping task_one..."
zip_lambda_with_deps "task_one"

# Zip task_two
echo "📦 Zipping task_two..."
zip_lambda_with_deps "task_two"

# Zip s3_trigger
echo "📦 Zipping s3_trigger..."
zip_lambda_with_deps "s3_trigger"

echo "✅ Lambda functions zipped successfully: ${LAMBDA_FOLDER}/task_one.zip, ${LAMBDA_FOLDER}/task_two.zip, ${LAMBDA_FOLDER}/s3_trigger.zip"

This will generate zipped packages under your defined LAMBDA_FOLDER using the requirements.txt file for dependencies

chmod +x ./scripts/zip_lambdas.sh
./scripts/zip_lambdas.sh

Step 3: Deploy Lambda functions and state machine

Finally, create a deploy.py in the root directory to deploy the Lambda functions and the state machine. This script does the following:

  • Create or update all three Lambda functions.
  • Deploy the state machine with linked Lambda ARNs.
  • Add permissions for S3 to invoke the Lambda functions.
  • Attach an S3 event trigger to the S3-trigger Lambda function.
import json
import os
import time
from typing import Any

import boto3
import dotenv  # type: ignore
from loguru import logger

dotenv.load_dotenv()

AWS_REGION = os.environ["AWS_REGION"]
AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
SF_ROLE_NAME = os.environ["SF_ROLE_NAME"]
LAMBDA_ROLE_NAME = os.environ["LAMBDA_ROLE_NAME"]
LAMBDA_FOLDER = os.environ["LAMBDA_FOLDER"]
LAMBDA_FUNCTION_ONE = os.environ["LAMBDA_FUNCTION_ONE"]
LAMBDA_FUNCTION_TWO = os.environ["LAMBDA_FUNCTION_TWO"]
LAMBDA_FUNCTION_S3_TRIGGER = os.environ["LAMBDA_FUNCTION_S3_TRIGGER"]
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
STATE_MACHINE_NAME = os.environ["STATE_MACHINE_NAME"]

s3_client = boto3.client("s3", region_name=AWS_REGION)
lambda_client = boto3.client("lambda", region_name=AWS_REGION)
sf_client = boto3.client("stepfunctions", region_name=AWS_REGION)

def create_lambda_function(name: str, zip_path: str, role_arn: str, handler: str) -> str:
    """Create or update a Lambda function."""
    if not os.path.exists(zip_path):
        raise FileNotFoundError(f"{zip_path} does not exist. Make sure to zip your code first.")

    with open(zip_path, "rb") as f:
        zipped_code = f.read()

    # Environment variables for the Lambda function
    env_vars = {
        "STATE_MACHINE_NAME": STATE_MACHINE_NAME,
        "AWS_ACCOUNT_ID": AWS_ACCOUNT_ID
    }

    try:
        lambda_client.get_function(FunctionName=name)
        logger.info(f"✅ Lambda function {name} already exists. Updating code...")
        response = lambda_client.update_function_code(FunctionName=name, ZipFile=zipped_code)

        # Update environment variables
        lambda_client.update_function_configuration(
            FunctionName=name,
            Environment={"Variables": env_vars}
        )
    except lambda_client.exceptions.ResourceNotFoundException:
        logger.info(f"🚀 Creating new Lambda function {name}...")
        response = lambda_client.create_function(
            FunctionName=name,
            Runtime="python3.12",
            Role=role_arn,
            Handler=handler,
            Code={"ZipFile": zipped_code},
            Timeout=10,
            Environment={"Variables": env_vars},
        )

    return str(response["FunctionArn"])

def deploy_state_machine(
    def_path: str, role_arn: str, lambdas: dict[str, str], state_machine_name: str = str(STATE_MACHINE_NAME)
) -> dict[str, Any]:
    with open(def_path) as f:
        definition = json.load(f)

    # Convert JSON object to string
    definition_str = json.dumps(definition)

    # Replace placeholders with real Lambda ARNs
    definition_str = definition_str.replace("TASK_ONE_LAMBDA_ARN", lambdas["TaskOne"])
    definition_str = definition_str.replace("TASK_TWO_LAMBDA_ARN", lambdas["TaskTwo"])

    # Now deploy
    existing_machines = sf_client.list_state_machines()["stateMachines"]
    logger.info(f"Found {len(existing_machines)} existing state machines.")
    logger.info(f"Existing machines: {existing_machines}")

    state_machine_arn = None
    for sm in existing_machines:
        if sm["name"] == state_machine_name:
            state_machine_arn = sm["stateMachineArn"]
            break

    if state_machine_arn:
        logger.info(f"✅ State Machine {state_machine_name} already exists. Updating...")
        response = sf_client.update_state_machine(
            stateMachineArn=state_machine_arn, definition=definition_str, roleArn=role_arn
        )
    else:
        logger.info(f"🚀 Creating new State Machine {state_machine_name}...")
        response = sf_client.create_state_machine(
            name=state_machine_name, definition=definition_str, roleArn=role_arn, type="STANDARD"
        )

    return dict(response)

def add_lambda_permission(function_name: str, bucket_name: str) -> None:
    """Add permission for S3 to invoke the Lambda function."""
    try:
        lambda_client.add_permission(
            FunctionName=function_name,
            StatementId=f"S3InvokeFunction-{bucket_name}",  # Unique statement ID
            Action="lambda:InvokeFunction",
            Principal="s3.amazonaws.com",
            SourceArn=f"arn:aws:s3:::{bucket_name}",
        )
        logger.info(f"Added S3 invoke permission to Lambda function {function_name}")
    except lambda_client.exceptions.ResourceConflictException:
        logger.info(f"Permission already exists for Lambda function {function_name}")

def add_s3_trigger_to_bucket(bucket_name: str, lambda_arn: str) -> None:
    """Configure S3 bucket to trigger Lambda on file upload."""
    function_name = lambda_arn.split(":")[-1]

    add_lambda_permission(function_name, bucket_name)

    # Wait for a few seconds to ensure the permission is propagated
    logger.info("Waiting for permission propagation...")
    time.sleep(5)

    try:
        s3_client.put_bucket_notification_configuration(
            Bucket=bucket_name,
            NotificationConfiguration={
                "LambdaFunctionConfigurations": [{"LambdaFunctionArn": lambda_arn, "Events": ["s3:ObjectCreated:*"]}]
            },
        )
        logger.info(f"Added S3 trigger configuration to bucket {bucket_name}")
    except Exception as e:
        logger.error(f"Error configuring S3 trigger: {str(e)}")
        raise

if __name__ == "__main__":
    logger.info("Starting deployment...")

    lambda_role = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{LAMBDA_ROLE_NAME}"
    sf_role = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{SF_ROLE_NAME}"

    # Create or update all Lambda functions
    lambdas = {
        "TaskOne": create_lambda_function(
            LAMBDA_FUNCTION_ONE,
            f"{LAMBDA_FOLDER}/task_one.zip",
            lambda_role,
            "app_task_one.lambda_handler",
        ),
        "TaskTwo": create_lambda_function(
            LAMBDA_FUNCTION_TWO, f"{LAMBDA_FOLDER}/task_two.zip", lambda_role, "app_task_two.lambda_handler"
        ),
        "S3Trigger": create_lambda_function(
            LAMBDA_FUNCTION_S3_TRIGGER, f"{LAMBDA_FOLDER}/s3_trigger.zip", lambda_role, "app_s3_trigger.lambda_handler"
        ),
    }

    deploy_state_machine("state_machine_definition.json", sf_role, lambdas)
    add_s3_trigger_to_bucket(S3_BUCKET_NAME, lambdas["S3Trigger"])

    logger.info("Deployment complete!")

To deploy the Lambda functions and state machine, run the following command:

uv run deploy.py

Deploy Lambdas

Once the Lambda functions and the state machine are deployed you can find them in AWS under AWS Lambda and AWS Step Functions.

Writing tests for the application

Unit testing ensures that each Lambda function behaves as expected with valid and invalid inputs. You will use pytest for testing the functions and create all test files under the tests folder.

Testing the S3 trigger Lambda

To test the S3 trigger Lambda, you need to create an test_s3_trigger.py file. This will mock the S3 event and check if the Step Functions execution is started correctly.

from typing import Any

import pytest

from aws_lambdas.s3_trigger.app_s3_trigger import lambda_handler

@pytest.fixture
def mock_env_vars(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv("STATE_MACHINE_NAME", "TestStateMachine")

def test_lambda_handler_success(mock_env_vars: None) -> None:
    # Arrange
    test_event = {"Records": [{"s3": {"bucket": {"name": "test-bucket"}, "object": {"key": "folder/document.pdf"}}}]}

    # Act
    result = lambda_handler(test_event, None)

    # Assert
    assert result["statusCode"] == 200
    assert "Step Functions execution started successfully" in result["body"]

def test_lambda_handler_invalid_event() -> None:
    # Arrange
    test_event: dict[str, Any] = {}

    # Act & Assert
    with pytest.raises(KeyError):
        lambda_handler(test_event, None)

Testing the metadata extraction Lambda

To test the metadata extraction Lambda, you need to create an test_task_one.py file. This will check if the function correctly extracts the metadata from the s3_path.

import pytest

from aws_lambdas.task_one.app_task_one import lambda_handler

def test_lambda_handler_success() -> None:
    # Arrange
    test_event = {"document_id": "test123", "s3_path": "s3://test-bucket/folder/document.pdf"}

    # Act
    result = lambda_handler(test_event, None)

    # Assert
    assert result["bucket_name"] == "test-bucket"
    assert result["file_name"] == "document"
    assert result["file_extension"] == ".pdf"
    assert result["s3_path"] == "test-bucket/folder/document.pdf"

def test_lambda_handler_missing_s3_path() -> None:
    # Arrange
    test_event = {"document_id": "test123"}

    # Act & Assert
    with pytest.raises(KeyError):
        lambda_handler(test_event, None)

Testing the file classification Lambda

To test the file classification Lambda, you need to create an test_task_two.py file. This will check if the function correctly classifies the file based on its extension.

from aws_lambdas.task_two.app_task_two import lambda_handler

def test_lambda_handler_pdf() -> None:
    # Arrange
    test_event = {"document_id": "test123", "file_extension": ".pdf"}

    # Act
    result = lambda_handler(test_event, None)

    # Assert
    assert result["document_id"] == "test123"
    assert result["classification"] == "PDF Document"

def test_lambda_handler_image() -> None:
    # Arrange
    test_event = {"document_id": "test123", "file_extension": ".jpg"}

    # Act
    result = lambda_handler(test_event, None)

    # Assert
    assert result["document_id"] == "test123"
    assert result["classification"] == "Image File"

def test_lambda_handler_unknown() -> None:
    # Arrange
    test_event = {"document_id": "test123", "file_extension": ".xyz"}

    # Act
    result = lambda_handler(test_event, None)

    # Assert
    assert result["document_id"] == "test123"
    assert result["classification"] == "Unknown Type"

To run the tests you can use the following command:

uv run pytest

Test Lambdas

Deploying the application with CircleCI

To automate the deployment pipeline using CircleCI, you need to define a CircleCI config file (.circleci/config.yml). This pipeline will handle tasks such as installing dependencies, validating your code, configuring AWS resources, and running tests.

version: 2.1

orbs:
  aws-cli: circleci/aws-cli@5.3.1

jobs:
  build-deploy:
    docker:
      - image: cimg/python:3.12
    steps:
      - checkout

      - run:
          name: Install UV
          command: |
            curl -LsSf https://astral.sh/uv/install.sh | sh

      - run:
          name: Create venv and install dependencies
          command: |
            uv sync --all-groups

      - run:
          name: Create .env file
          command: |
            echo "AWS_REGION=${AWS_REGION}" > .env
            echo "AWS_ACCOUNT_ID=${AWS_ACCOUNT_ID}" >> .env
            echo "S3_BUCKET_NAME=${S3_BUCKET_NAME}" >> .env
            echo "STATE_MACHINE_NAME=${STATE_MACHINE_NAME}" >> .env
            echo "SF_ROLE_NAME=${SF_ROLE_NAME}" >> .env
            echo "LAMBDA_FOLDER=${LAMBDA_FOLDER}" >> .env
            echo "LAMBDA_ROLE_NAME=${LAMBDA_ROLE_NAME}" >> .env
            echo "LAMBDA_FUNCTION_ONE=${LAMBDA_FUNCTION_ONE}" >> .env
            echo "LAMBDA_FUNCTION_TWO=${LAMBDA_FUNCTION_TWO}" >> .env
            echo "LAMBDA_FUNCTION_S3_TRIGGER=${LAMBDA_FUNCTION_S3_TRIGGER}" >> .env

      - run:
          name: Run ruff
          command: |
            uv run ruff check . --fix --exit-non-zero-on-fix

      - run:
          name: Run MyPy
          command: |
            uv run mypy

      - run:
          name: Run tests
          command: |
            uv run pytest

      - aws-cli/setup:
          profile_name: default

      - run:
          name: Create Roles
          command: |
            ./scripts/create_roles.sh

      - run:
          name: Zip Lambda functions
          command: |
            ./scripts/zip_lambdas.sh

      - run:
          name: Deploy
          command: |
            uv run deploy.py

workflows:
  deploy:
    jobs:
      - build-deploy

What this configuration does:

  • Orbs: The aws-cli orb sets up the AWS CLI so you can interact with AWS services.
  • Job (build-deploy): This job includes 11 deployment steps.

    1. Checks out your code.
    2. Installs uv for managing Python dependencies.
    3. Syncs and installs dependencies from pyproject.toml.
    4. Creates the .env file with the environment variables.
    5. Runs ruff to automatically lint and fix Python code.
    6. Runs mypy for static type checking.
    7. Runs the test suite using pytest.
    8. Sets up the AWS CLI environment.
    9. Runs the create_roles.sh script to create the necessary IAM roles.
    10. Runs the zip_lambdas.sh script to zip the Lambda functions.
    11. Runs the deploy.py script to deploy the Lambda functions and the state machine.
  • Workflow (deploy) This workflow triggers the build-deploy job every time you push new changes to your repository.

Once this configuration is committed and pushed to your GitHub repository, CircleCI will automatically kick off the deployment process.

However, before CircleCI can deploy your application to AWS, you must create a new project, and configure the required environment variables. Go to your CircleCI dashboard, create a new project, and link it to your GitHub repository. Then, navigate to your project settings, and add the same variables you defined in your .env file and additionally the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. These variables are needed for AWS authentication and for the scripts to run correctly.

CircleCI env vars

Additionally, you need to create a pipeline in your project and set up a trigger for the pipeline. If you want to trigger the pipeline on each push to the main branch, you can set up an all pushes event.

CircleCI pipeline

CircleCI trigger

Once the environment variables, the pipeline, and the trigger are set up, you will be able to trigger the pipeline each time you push the code to your GitHub, manually.

Once you push, you will be able to monitor the steps directly in the CircleCI dashboard. If everything is set up correctly, you should see a green build.

CircleCI build and deploy

To confirm that the deployment works as expected, upload a PDF to the configured S3 bucket. The Lambda function should be automatically triggered by the S3 event.

To upload a PDF to the S3 bucket, you can use the following command or use the AWS Console:

aws s3 cp /path/to/your/file.ext s3://your-bucket-name/

Then go to the AWS Step Functions console and check that the state machine has started a new execution. You should see the execution in the list of executions. Inside the execution, you should see the two tasks that were defined in the state machine, and by checking the input and output of each task, you can confirm that the Lambda functions are working as expected.

Step Functions execution

Cleaning up

If you do not need the app anymore, make sure you delete the resources to avoid unnecessary charges.

Conclusion

In this tutorial, you have learned how to deploy an AWS Step Functions workflow with Lambda functions and CircleCI. You have also learned how to test your code and automate the deployment process.

By combining AWS Step Functions, Lambda, and CircleCI, you have built a robust and automated serverless data processing pipeline. This architecture promotes scalability, fault tolerance, and clean separation of concerns between workflow logic and task execution. With CircleCI automating your testing and deployment, your infrastructure stays you are processing files from S3, orchestrating complex workflows, or scaling microservices, this setup provides a production-ready foundation for modern cloud applications.