TutorialsOct 27, 202511 min read

Validate CDC data in your CI/CD pipeline using CircleCI

Kevin Kimani

Software Engineer

Change Data Capture (CDC) is a technique used to identify and capture changes, such as inserts, updates, and deletes, in a source database so they can be replicated to another system in real-time. This approach is crucial in modern data pipelines, especially for powering data lakes, analytics platforms, and event-driven applications that depend on up-to-date information.

Setting up a CDC pipeline is only the first step. Without proper validation, data replication issues like missing records, duplicate entries, or corrupted data can silently break downstream systems and lead to inaccurate business insights. These problems often go undetected until they’ve already caused significant damage.

This is where automated validation in your CI/CD pipeline becomes essential. In this guide, you’ll learn how to build automated CDC validation into your development workflow using CircleCI. You’ll simulate data changes in your source database and verify that those changes are correctly reflected in the destination system. By the end, you’ll have a reliable framework that continuously validates your CDC pipeline’s integrity with every code change, catching issues before they reach production.

Prerequisites

To follow along with this guide, you need to have the following:

Setting up the CDC pipeline

To keep this guide focused on CDC pipeline validation, we have prepared a starter template with a simple CDC pipeline implementation that you will write validation tests for. Clone the starter template to your local machine by executing the following command in your terminal:

git clone --single-branch -b starter-template https://github.com/CIRCLECI-GWP/circleci-cdc-pipeline-validation-demo.git

Here’s an overview of the core technologies used in the CDC pipeline:

  • PostgreSQL: A popular open-source relational database that acts as the source of truth in the CDC setup.
  • Debezium: An open-source CDC platform that monitors PostgreSQL for inserts, updates, and deletes, and streams those changes into Kafka.
  • Apache Kafka: A distributed event streaming platform that buffers the change events captured by Debezium.
  • Apache Spark: Used here to consume change events from Kafka and write them into Hudi tables.
  • Apache Hudi: A data lake storage format that supports upserts and incremental pulls, making it ideal for capturing CDC events in data lakes.
  • MinIO: An S3-compatible object storage used in this pipeline to store the Hudi tables.

Here is a breakdown of the key files in the project:

  • pipeline/docker-compose.yml: This file brings up all the necessary services to simulate a CDC environment on your local machine or in CI.
  • pipeline/scripts: This folder holds one SQL and two shell scripts:
    • init.sql: Creates a todos table in the database, populates it, and creates a user for Debezium with necessary privileges and a publication.
    • register_postgres_connector.sh: Registers the Debezium PostgreSQL connector with Debezium
    • spark-submit.sh: Submits the Spark job that reads from Kafka and writes to Hudi in MinIO
  • pipeline/config-files/core-site.xml: Contains the configuration that allows Hudi to access MinIO
  • validator: Contains Python scripts that you will use to implement your CDC validation logic:
    • config.py: Configuration settings
    • data_access.py: Data access layer for PostgreSQL and S3 operations
    • models.py: Data models and enums
    • report_generator.py: Utilities for generating validation reports
    • test_data_generator.py: Utilities for generating seed data
  • requirements.txt: Lists all Python dependencies needed to run the validation logic

Note: You will notice that most of the credentials in this project are hardcoded as this project is for demo purposes only. You should never hardcode sensitive credentials for production applications.

You can now spin up all the pipeline components. In your terminal, execute this command:

docker compose -f pipeline/docker-compose.yml up -d

Ensure all the services are up and running by executing the command docker ps. The output should be similar to this:

CONTAINER ID   IMAGE                                   COMMAND                  CREATED          STATUS                    PORTS                                                             NAMES
e1824571b3f7   kimanikevin254/debezium-connect:3.0     "/docker-entrypoint.…"   55 seconds ago   Up 5 seconds              8778/tcp, 0.0.0.0:8083->8083/tcp, [::]:8083->8083/tcp, 9092/tcp   kafka-connect
0ce8772e977d   kimanikevin254/spark-hudi:1.0           "/opt/bitnami/script…"   55 seconds ago   Up 5 seconds              0.0.0.0:8080->8080/tcp, [::]:8080->8080/tcp                       spark
0af28fecc324   confluentinc/cp-kafka:7.4.0             "/etc/confluent/dock…"   55 seconds ago   Up 38 seconds (healthy)   0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp                       kafka
fc5b0db19d25   confluentinc/cp-schema-registry:7.9.0   "/etc/confluent/dock…"   55 seconds ago   Up 38 seconds             0.0.0.0:8081->8081/tcp, [::]:8081->8081/tcp                       schema-registry
80c329e8f858   confluentinc/cp-zookeeper:7.4.0         "/etc/confluent/dock…"   55 seconds ago   Up 54 seconds (healthy)   2888/tcp, 0.0.0.0:2181->2181/tcp, [::]:2181->2181/tcp, 3888/tcp   zookeeper
fd4e4c8cc5dd   quay.io/minio/minio                     "/usr/bin/docker-ent…"   55 seconds ago   Up 54 seconds             0.0.0.0:9000-9001->9000-9001/tcp, [::]:9000-9001->9000-9001/tcp   minio
c64f1ef4d27b   postgres:17                             "docker-entrypoint.s…"   55 seconds ago   Up 54 seconds             0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp                       cdc-postgres

Next, set up your local MinIO server and create a bucket to store your CDC data using the following commands:

mc alias set myminio http://localhost:9000 minioadmin minioadmin

mc mb myminio/cdc-bucket

The first command registers a shortcut (alias) named myminio that connects to your MinIO server running at http://localhost:9000, using the credentials defined in the pipeline/docker-compose.yml file. The second command creates a new bucket inside the server to hold your CDC data.

Register the PostgreSQL connector using the commands:

chmod +x ./pipeline/scripts/register_postgres_connector.sh
./pipeline/scripts/register_postgres_connector.sh

You should get the following output:

```json (prettified) { “name”: “postgres-connector”, “config”: { “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”, “database.hostname”: “cdc-postgres”, “database.port”: “5432”, “database.user”: “debezium_user”, “database.password”: “debezium_password”, “database.dbname”: “cdc_db”, “topic.prefix”: “postgres”, “plugin.name”: “pgoutput”, “table.include.list”: “public.todos”, “publication.name”: “debezium_pub”, “key.converter”: “io.confluent.connect.avro.AvroConverter”, “key.converter.schema.registry.url”: “http://schema-registry:8081”, “value.converter”: “io.confluent.connect.avro.AvroConverter”, “value.converter.schema.registry.url”: “http://schema-registry:8081”, “transforms”: “unwrap”, “transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”, “transforms.unwrap.drop.tombstones”: “false”, “transforms.unwrap.delete.handling.mode”: “rewrite”, “name”: “postgres-connector” }, “tasks”: [], “type”: “source” }


Finally, run the spark job that reads from Kafka and writes to the data lake:

```bash
chmod +x ./pipeline/scripts/spark-submit.sh
./pipeline/scripts/spark-submit.sh

Running baseline validations

Before applying any controlled changes to the source database, it’s important to verify that the source and destination systems are in sync after the first ingestion. These baseline validations serve as a sanity check to ensure that:

  • All rows from the source database (PostgreSQL) have been successfully replicated to the destination (S3 via Hudi).
  • The primary keys in both systems match exactly which confirms that no data has been lost or duplicated during the initial sync.

To implement this, create a new file named validator/validators.py and add the code below:

"""Validation logic for the CDC pipeline"""

import re
import logging
from datetime import datetime
from typing import Dict, List, Any
import pandas as pd

from models import TestResult, ValidationResult
from config import HUDI_RECORD_KEY, HUDI_DELETION_COLUMN

class DataValidator:
    def __init__(self, data_access_manager, table_configs: Dict[str, Dict]):
        self.data_access = data_access_manager
        self.table_configs = table_configs
        self.logging = logging.getLogger(__name__)

    def validate_row_counts(self, table_name: str) -> TestResult:
        """Validate that row counts match between PostgreSQL and S3."""
        try:
            pg_data = self.data_access.get_postgres_data(table_name)
            s3_data = self.data_access.get_s3_data(table_name)

            # Filter out soft-deleted rows from the datalake
            active_s3_data = s3_data[s3_data[HUDI_DELETION_COLUMN] != "true"]

            s3_count = len(active_s3_data)
            pg_count = len(pg_data)

            if pg_count == s3_count:
                result = ValidationResult.PASS
                details = f"Row counts match: {pg_count} rows."
            else:
                result = ValidationResult.FAIL
                details = f"Row counts do not match: PostgreSQL has {pg_count} rows, S3 has {s3_count} rows."

            return TestResult(
                test_name=f"row_count_validation_{table_name}",
                result=result,
                details=details,
                timestamp=datetime.now(),
                metrics={"pg_count": pg_count, "s3_count": s3_count}
            )

        except Exception as e:
            return TestResult(
                test_name=f"row_count_validation_{table_name}",
                result=ValidationResult.FAIL,
                details=str(e),
                timestamp=datetime.now()
            )

    def validate_data_integrity(self, table_name: str) -> TestResult:
        """Validate data integrity by comparing primary keys."""
        try:
            pg_data = self.data_access.get_postgres_data(table_name)
            s3_data = self.data_access.get_s3_data(table_name)

            if pg_data.empty and s3_data.empty:
                return TestResult(
                    test_name=f"data_integrity_validation_{table_name}",
                    result=ValidationResult.PASS,
                    details="Both datasets are empty.",
                    timestamp=datetime.now()
                )

            # Filter out deleted records from S3 data
            s3_data = s3_data[s3_data[HUDI_DELETION_COLUMN] != "true"]

            pg_pk_col = self.table_configs[table_name]['primary_key']

            if pg_pk_col in pg_data.columns and HUDI_RECORD_KEY in s3_data.columns:
                # Compare primary keys
                pg_ids = set(pg_data[pg_pk_col].astype(str))
                s3_ids = set(s3_data[HUDI_RECORD_KEY].astype(str))

                missing_in_s3 = pg_ids - s3_ids
                extra_in_s3 = s3_ids - pg_ids

                if len(missing_in_s3) == 0 and len(extra_in_s3) == 0:
                    result = ValidationResult.PASS
                    details = "Data integrity check passed. All primary keys match."
                else:
                    result = ValidationResult.FAIL
                    details = (
                        f"Data integrity check failed. "
                        f"Missing in S3: {len(missing_in_s3)} rows, "
                        f"Extra in S3: {len(extra_in_s3)} rows."
                    )
            else:
                result = ValidationResult.FAIL
                details = f"PG primary key or Hudi record key not found in the datasets."

            return TestResult(
                test_name=f"data_integrity_validation_{table_name}",
                result=result,
                details=details,
                timestamp=datetime.now()
            )

        except Exception as e:
            return TestResult(
                test_name=f"data_integrity_validation_{table_name}",
                result=ValidationResult.FAIL,
                details=str(e),
                timestamp=datetime.now()
            )

The DataValidator class provides two main methods: validate_row_counts and validate_data_integrity. The validate_row_counts method checks that the number of non-deleted rows in the S3-backed Hudi table matches the number of rows in the PostgreSQL source table, logging a pass or fail depending on whether the counts align. The validate_data_integrity method further verifies that the primary keys in both PostgreSQL and S3 match exactly, flagging any missing or extra keys in the S3 data.

Applying controlled changes to the source database

Once baseline validations pass, you can safely start applying controlled changes to the source database. These changes are intentional modifications such as inserts, updates, or soft deletions that will help verify whether your CDC pipeline is capturing and reflecting changes correctly in the destination.

This phase is crucial because it allows you to test the behavior of the pipeline in a realistic, yet controlled, environment before it handles production traffic. You’ll be able to confirm that the CDC mechanism correctly identifies changes, processes them, and writes them to the data lake in the expected format.

To implement the logic to apply controlled changes to the source database, open the validator/data_access.py file and add the method below to the DataAccessManager class:

def execute_changes(self, table_name: str, changes: List[Dict[str, Any]]):
        """
        Make controlled changes to the PostgreSQL database.

        Args:
            table_name (str): The name of the table to modify.
            changes (List[Dict[str, Any]]): A list of dictionaries representing the changes to apply.
                [
                    {"operation": "insert", "data": {...}},
                    {"operation": "update", "data": {...}, "where_column": "id", "where_value": 1},
                    {"operation": "delete", "where_column": "id", "where_value": 2}
                ]
        """

        with self.pg_conn.cursor() as cursor:
            for change in changes:
                sql = None
                params = []
                try:
                    operation = change['operation']

                    if operation == 'insert':
                        columns = list(change['data'].keys())
                        values_placeholders = ', '.join(['%s'] * len(columns))
                        sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({values_placeholders})"
                        params = list(change['data'].values())

                    elif operation == 'update':
                        set_clauses = []
                        for k, v in change['data'].items():
                            set_clauses.append(f"{k} = %s")
                            params.append(v)
                        set_clause_str = ', '.join(set_clauses)

                        where_column = change['where_column']
                        where_value = change['where_value']
                        sql = f"UPDATE {table_name} SET {set_clause_str} WHERE {where_column} = %s"
                        params.append(where_value)

                    elif operation == 'delete':
                        where_column = change['where_column']
                        where_value = change['where_value']
                        sql = f"DELETE FROM {table_name} WHERE {where_column} = %s"
                        params.append(where_value)
                    else:
                        self.logging.warning(f"Unsupported operation: {operation}. Skipping change.")
                        continue

                    if sql:
                        cursor.execute(sql, params)
                        self.logging.info(f"Executed SQL (template): {sql} with parameters: {params}")

                except Exception as e:
                    self.logging.error(f"Error executing change for operation '{operation}': {e}")
                    self.pg_conn.rollback()
                    raise

            self.pg_conn.commit()
            return

The execute_changes method applies a list of controlled changes to the specified PostgreSQL table. Each change is described as a dictionary that defines the operation type and the data or conditions involved. The method dynamically builds the appropriate SQL for each operation and executes it using a database cursor. It ensures proper logging, handles exceptions gracefully, and commits the changes once all operations are processed.

Verifying changes in the data lake

Once controlled changes have been applied to the source PostgreSQL database, it’s important to verify that these changes have been accurately captured and reflected in the data lake. This ensures the CDC pipeline is functioning as expected.

To do this, open the validator/validators.py file and add the following methods to the DataValidator class:

def _extract_pk_from_where_clause(self, where_clause: str, pk_col: str) -> Any:
        """Extract primary key value from WHERE clause."""
        try:
            # Handle common WHERE clause formats
            # "id = 1", "id=1", "id = '1'", etc.
            pattern = rf"{pk_col}\s*=\s*['\"]?([^'\"\s]+)['\"]?"
            match = re.search(pattern, where_clause, re.IGNORECASE)

            if match:
                value = match.group(1)
                # Try to convert to int if possible
                try:
                    return int(value)
                except ValueError:
                    return value
            return None
        except Exception:
            return None

    def validate_changes_propagated(self, table_name: str, expected_changes: List[Dict[str, Any]]) -> TestResult:
        """Validate that controlled changes are reflected in the data lake."""
        try:
            s3_data = self.data_access.get_s3_data(table_name)
            pk_col = self.table_configs[table_name]['primary_key']

            changes_validated = 0
            failed_validations = []

            for change in expected_changes:
                if change['operation'] == 'insert':
                    # Check if the inserted record exists in the datalake
                    inserted_data = change['data']

                    # Find matching rows by filtering on all inserted fields (excluding the PK)
                    matching_condition = True
                    filter_fields = {}

                    for key, expected_value in inserted_data.items():
                        if key != pk_col and key in s3_data.columns:
                            filter_fields[key] = expected_value
                            matching_condition = matching_condition & (s3_data[key].astype(str) == str(expected_value))

                    # Only proceed if we have fields to filter on
                    if filter_fields:
                        matching_rows = s3_data[matching_condition]

                        if len(matching_rows) > 0:
                            active_rows = matching_rows[matching_rows[HUDI_DELETION_COLUMN] != "true"]

                            if len(active_rows) > 0:
                                changes_validated += 1
                                self.logging.info(f"✓ Insert validated - found {len(active_rows)} matching active record(s)")
                            else:
                                failed_validations.append(f"Insert found but all matching records are marked as deleted: {filter_fields}")
                        else:
                            failed_validations.append(f"Insert not found with matching fields: {filter_fields}")
                    else:
                        failed_validations.append(f"No valid fields to match for insert validation: {inserted_data}")

                elif change['operation'] == 'update':
                    # Check if the updated record reflects the changes
                    updated_data = change['data']
                    where_clause = f"{change['where_column']} = {change['where_value']}"

                    # Extract ID from where clause
                    pk_value = self._extract_pk_from_where_clause(where_clause, pk_col)

                    if pk_value:
                        pk_value_str = str(pk_value)
                        matching_rows = s3_data[s3_data[HUDI_RECORD_KEY].astype(str) == pk_value_str]

                        if len(matching_rows) > 0:
                            row = matching_rows.iloc[0]

                            # Check if it's not marked as deleted
                            if HUDI_DELETION_COLUMN in row and row[HUDI_DELETION_COLUMN] == "true":
                                failed_validations.append(f"Update target found but marked as deleted: ID {pk_value}")
                            else:
                                # Validate updated fields
                                field_mismatches = []
                                for key, expected_value in updated_data.items():
                                    if key in row.index:
                                        actual_value = row[key]
                                        # Convert for comparison
                                        if str(actual_value) != str(expected_value):
                                            field_mismatches.append(f"{key}: expected {expected_value}, got {actual_value}")
                                    else:
                                        field_mismatches.append(f"{key}: field not found in S3 data")

                                if field_mismatches:
                                    failed_validations.append(f"Update validation failed for ID {pk_value}: {field_mismatches}")
                                else:
                                    changes_validated += 1
                                    self.logging.info(f"✓ Update validated for ID: {pk_value}")
                        else:
                            failed_validations.append(f"Update target not found: {where_clause}")
                    else:
                        failed_validations.append(f"Could not extract primary key from where clause: {where_clause}")

                elif change['operation'] == 'delete':
                    # Check if the deleted record is marked as deleted in the datalake
                    where_clause = f"{change['where_column']} = {change['where_value']}"

                    # Extract ID from where clause
                    pk_value = self._extract_pk_from_where_clause(where_clause, pk_col)

                    if pk_value:
                        pk_value_str = str(pk_value)
                        matching_rows = s3_data[s3_data[HUDI_RECORD_KEY].astype(str) == pk_value_str]

                        if len(matching_rows) > 0:
                            row = matching_rows.iloc[0]

                            # Check if it's marked as deleted
                            if HUDI_DELETION_COLUMN in row and row[HUDI_DELETION_COLUMN] == "true":
                                changes_validated += 1
                                self.logging.info(f"✓ Delete validated for ID: {pk_value}")
                            else:
                                failed_validations.append(f"Delete target found but not marked as deleted: ID {pk_value}")
                        else:
                            failed_validations.append(f"Delete target not found. ID {pk_value}")
                    else:
                        failed_validations.append(f"Could not extract primary key from where clause: {where_clause}")

            if len(failed_validations) == 0:
                result = ValidationResult.PASS
                details = f"All {changes_validated} changes validated successfully"

            else:
                result = ValidationResult.FAIL
                details = f"Failed validations: {failed_validations}"

            return TestResult(
                test_name=f"change_propagation_validation_{table_name}",
                result=result,
                details=details,
                timestamp=datetime.now(),
                metrics={"validated_changes": changes_validated, "failed_changes": len(failed_validations)}
            )

        except Exception as e:
            self.logging.error('An error occured', e)
            return TestResult(
                test_name=f"change_propagation_validation_{table_name}",
                result=ValidationResult.FAIL,
                details=f"Error during change validation: {str(e)}",
                timestamp=datetime.now()
            )

This code defines a method validate_changes_propagated that ensures that controlled changes (inserts, updates, and deletes) applied to the PostgreSQL source database have been correctly reflected in the data lake. It accepts a list of expected changes and checks each one against the current data in the data lake.

  • For inserts, it verifies that the inserted record exists and is not marked as deleted.
  • For updates, it compares the updated fields with the corresponding record in the data lake using the primary key extracted from the WHERE clause.
  • For deletes, it checks whether the target record exists and is marked as deleted.

A helper method _extract_pk_from_where_clause is used to extract the primary key from SQL-like WHERE clauses. If all changes are correctly validated, it returns a passing TestResult. Otherwise, it returns detailed failure information for each unsuccessful validation.

Orchestrating the full validation workflow

Once the individual validation components are in place, the next step is to tie them together into a complete validation process. This involves creating a validation class that wraps all core testing logic, along with a main script that coordinates when and how these methods are executed.

Start by creating a new file named validator/cdc_pipeline_validator.py and adding the code below:

"""Main validator orchestrator"""

import json
import logging
import time
from datetime import datetime
from typing import Dict, List, Any

from config import PIPELINE_SYNC_WAIT_TIME_SECS
from models import TestResult, ValidationResult
from data_access import DataAccessManager
from test_data_generator import TestDataGenerator
from validators import DataValidator
from report_generator import ReportGenerator

class CDCPipelineValidator:
    def __init__(self, postgres_config: Dict[str, Any], s3_config: Dict[str, Any], table_configs: List[Dict[str, Any]]):
        self.table_configs = {config['table_name']: config for config in table_configs}
        self.test_results: List[TestResult] = []

        # Set up logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

        # Initialize components
        self.data_access = DataAccessManager(postgres_config, s3_config)
        self.test_data_generator = TestDataGenerator()
        self.validator = DataValidator(self.data_access, self.table_configs)
        self.report_generator = ReportGenerator()

    def run_baseline_validation(self) -> List[TestResult]:
        """Run baseline validation for all configured tables."""
        self.logger.info("Starting baseline validation...")
        results = []

        for table_name in self.table_configs.keys():
            self.logger.info(f"Validating table: {table_name}")

            # Row Count Validation
            row_count_result = self.validator.validate_row_counts(table_name)
            results.append(row_count_result)

            # Data Integrity Validation
            data_integrity_result = self.validator.validate_data_integrity(table_name)
            results.append(data_integrity_result)

        self.test_results.extend(results)
        return results

    def run_change_validation_test(self, table_name: str, test_changes: List[Dict[str, Any]]) -> List[TestResult]:
        """Run complete change validation test."""
        results = []

        try:
            # Make controlled changes
            self.logger.info(f"Creating test changes for table: {table_name}")
            self.data_access.execute_changes(table_name, test_changes)

            # Wait for pipeline sync
            self.logger.info(f"Pausing for {PIPELINE_SYNC_WAIT_TIME_SECS} seconds for data to sync...")
            time.sleep(PIPELINE_SYNC_WAIT_TIME_SECS)

            # Validate changes propagated
            change_result = self.validator.validate_changes_propagated(table_name, test_changes)
            results.append(change_result)

            self.logger.info(f"Change validation completed for {table_name}")

        except Exception as e:
            error_result = TestResult(
                test_name=f"change_validation_test_{table_name}",
                result=ValidationResult.FAIL,
                details=f"Test failed with error: {str(e)}",
                timestamp=datetime.now()
            )
            results.append(error_result)

        self.test_results.extend(results)
        return results

    def generate_test_changes(self, table_name: str) -> List[Dict[str, Any]]:
        """Generate test changes for a table."""
        test_changes = []
        primary_key = self.table_configs[table_name]['primary_key']

        # Generate an insert change
        insert_change = self.test_data_generator.generate_test_data(table_name, 'insert')
        if insert_change:
            test_changes.append(insert_change)

        # Get existing data for update/delete operations
        existing_data = self.data_access.get_postgres_data(table_name)

        if not existing_data.empty:
            # Generate an update change
            update_change = self.test_data_generator.generate_test_data(
                table_name, 'update', existing_data, primary_key
            )
            if update_change:
                test_changes.append(update_change)

            # Generate a delete change
            delete_change = self.test_data_generator.generate_test_data(
                table_name, 'delete', existing_data, primary_key
            )
            if delete_change:
                test_changes.append(delete_change)

        return test_changes

    def generate_report(self) -> str:
        """Generate a comprehensive test report."""
        return self.report_generator.generate_report(self.test_results)

    def save_report(self, filename: str = None) -> str:
        """Save report to file."""
        report = self.generate_report()
        return self.report_generator.save_report(report, filename)

    def cleanup(self):
        """Clean up connections."""
        self.data_access.close_pg_connection()
        self.logger.info("Connections closed")

This code defines the CDCPipelineValidator class, which serves as the central point for running validations. It initializes all required components, including the data access layer, test data generator, validators, and report generator.

The class provides methods responsible for executing baseline validations, generating and applying test changes, verifying that changes are correctly propagated, compiling the results into a report, and cleaning up any open connections when the process is complete.

Next, create the main orchestration script in validator/main.py and add the code below:

"""Main execution script"""

import json
from config import POSTGRES_CONFIG, S3_CONFIG, TABLE_CONFIGS
from cdc_pipeline_validator import CDCPipelineValidator

def main():
    # Initialize validator
    validator = CDCPipelineValidator(POSTGRES_CONFIG, S3_CONFIG, TABLE_CONFIGS)

    try:
        # Run baseline validation
        validator.run_baseline_validation()

        # Run change validation tests for each table
        for table_name in validator.table_configs.keys():
            print(f"\n--- Running change validation tests for table: {table_name} ---")

            # Generate test changes
            test_changes = validator.generate_test_changes(table_name)

            if test_changes:
                print(f"Running change validation test for {table_name}...")
                print(f"Generated data: {json.dumps(test_changes, indent=2)}")
                validator.run_change_validation_test(table_name, test_changes)
            else:
                print(f"No test changes generated for {table_name}. Skipping change validation.")

        # Generate and save report
        print("\nGenerating final report...")
        filename = validator.save_report()
        print(f"Report saved to {filename}")

    finally:
        validator.cleanup()

if __name__ == "__main__":
    main()

This script acts as the entry point for the entire validation process. It begins by initializing an instance of the CDCPipelineValidator class with the necessary configuration for PostgreSQL, S3, and the list of tables to test. It runs the baseline validations to ensure the pipeline is in a healthy starting state, then loops through each table to generate and apply test changes.

If any changes are created, it runs the change validation logic to confirm the pipeline correctly replicates those changes. Once all validations are complete, the script generates a report and saves it to a file. Finally, it ensures all open resources are closed properly, regardless of whether the tests passed or failed.

Running the validation locally

With the full validation workflow in place, you can now run everything locally to verify your CDC pipeline end-to-end. This makes it easy to debug issues before integrating into any CI/CD system.

First, create a virtual environment, activate it, and install the required dependencies:

python3 -m venv .venv

source .venv/bin/activate

pip install -r requirements.txt

Run the validator’s main execution script located in validator/main.py by executing the following command:

python3 validator/main.py

Your output should be similar to the following:

INFO:data_access:PostgreSQL connection established.
INFO:data_access:S3 connection established.
INFO:cdc_pipeline_validator:Starting baseline validation...
INFO:cdc_pipeline_validator:Validating table: todos

--- Running change validation tests for table: todos ---
Running change validation test for todos...
Generated data: [
  {
    "operation": "insert",
    "data": {
      "title": "Similar time good decade.",
      "description": "Trial bit read quickly wish. Early probably our example rise tough analysis.",
      "completed": false
    }
  },
  {
    "operation": "update",
    "data": {
      "description": "Beautiful teach high."
    },
    "where_column": "id",
    "where_value": 5
  },
  {
    "operation": "delete",
    "where_column": "id",
    "where_value": 6
  }
]
INFO:cdc_pipeline_validator:Creating test changes for table: todos
INFO:data_access:Executed SQL (template): INSERT INTO todos (title, description, completed) VALUES (%s, %s, %s) with parameters: ['Similar time good decade.', 'Trial bit read quickly wish. Early probably our example rise tough analysis.', False]
INFO:data_access:Executed SQL (template): UPDATE todos SET description = %s WHERE id = %s with parameters: ['Beautiful teach high.', 5]
INFO:data_access:Executed SQL (template): DELETE FROM todos WHERE id = %s with parameters: [6]
INFO:cdc_pipeline_validator:Pausing for 20 seconds for data to sync...
INFO:validators:✓ Insert validated - found 1 matching active record(s)
INFO:validators:✓ Update validated for ID: 5
INFO:validators:✓ Delete validated for ID: 6
INFO:cdc_pipeline_validator:Change validation completed for todos

Generating final report...
Report saved to reports/validation_report_20250618_171219.txt
INFO:data_access:PostgreSQL connection closed.
INFO:cdc_pipeline_validator:Connections closed

This output shows you that the validation framework successfully connected to both PostgreSQL and S3, started the baseline checks, and then ran a full change validation test on the todos table. It created and applied an insert, update, and delete operation as test changes, waited for the CDC pipeline to sync the data, and then confirmed that each change appeared as expected in the destination.

The insert, update, and delete are all validated correctly, and a final report is generated and saved. Once everything is completed, all connections are cleanly closed.

You should also get a report in the reports folder with content similar to the following that shows you that all tests passed successfully:


CDC PIPELINE VALIDATION REPORT
===========================
Generated: 2025-06-18 17:12:19.906378

SUMMARY:
--------
Total Tests: 3
Passed: 3
Failed: 0
Success Rate: 100.0%

DETAILED RESULTS:
-----------------

Test: row_count_validation_todos
Status: PASS
Details: Row counts match: 5 rows.
Timestamp: 2025-06-18 17:11:59.811965
Metrics: {'pg_count': 5, 's3_count': 5}
====================================================================================================

Test: data_integrity_validation_todos
Status: PASS
Details: Data integrity check passed. All primary keys match.
Timestamp: 2025-06-18 17:11:59.834062
Metrics: N/A
====================================================================================================

Test: change_propagation_validation_todos
Status: PASS
Details: All 3 changes validated successfully
Timestamp: 2025-06-18 17:12:19.906206
Metrics: {'validated_changes': 3, 'failed_changes': 0}
====================================================================================================

Running the CDC pipeline validation in CI

Now that you can run the validation process locally, the next step is to automate it in your continuous integration (CI) pipeline. This ensures your CDC pipeline remains reliable and helps catch regressions early. In this section, you’ll configure CircleCI to run the full validation workflow automatically whenever changes are made to your data pipeline or validation logic.

Start by creating a new file named .circleci/config.yml in the root of your project and adding the following code:

version: 2.1

jobs:
    validate-cdc-pipeline:
        machine:
            image: ubuntu-2204:current
        resource_class: large
        steps:
            - checkout

            - run:
                  name: Install Python dependencies
                  command: |
                      python3 -m pip install --upgrade pip
                      pip install -r requirements.txt

            - run:
                  name: Start containers
                  command: docker compose -f pipeline/docker-compose.yml up -d

            - run:
                  name: Wait for services to be ready
                  command: sleep 30

            - run:
                  name: Install, configure MinIO client, and create bucket
                  command: |
                      curl https://dl.min.io/client/mc/release/linux-amd64/mc \
                        --create-dirs \
                        -o $HOME/minio-binaries/mc

                      chmod +x $HOME/minio-binaries/mc
                      export PATH=$PATH:$HOME/minio-binaries/

                      mc alias set myminio http://localhost:9000 minioadmin minioadmin

                      mc mb myminio/cdc-bucket

            - run:
                  name: Register Postgres connector
                  command: |
                      chmod +x ./pipeline/scripts/register_postgres_connector.sh
                      ./pipeline/scripts/register_postgres_connector.sh

            - run:
                  name: Run Spark job
                  command: |
                      chmod +x ./pipeline/scripts/spark-submit.sh
                      ./pipeline/scripts/spark-submit.sh &
                      echo $! > /tmp/spark_pid.txt # Store the PID of the background process
                  background: true

            - run:
                  name: Wait for the initial data to be synced to the datalake
                  command: sleep 30

            - run:
                  name: Run validator
                  command: |
                      python3 validator/main.py

            - run:
                  name: Stop Spark job gracefully
                  command: |
                      SPARK_SUBMIT_PID=$(cat /tmp/spark_pid.txt)
                      if [ -n "$SPARK_SUBMIT_PID" ]; then
                        echo "Attempting to stop Spark job launched with PID: $SPARK_SUBMIT_PID"
                        # Kill the process group to ensure all child processes are targeted
                        kill -TERM -$SPARK_SUBMIT_PID 2>/dev/null || true
                        sleep 10 # Give it time to shut down
                        # Check if it's still running
                        if ps -p $SPARK_SUBMIT_PID > /dev/null; then
                          echo "Spark job (PID: $SPARK_SUBMIT_PID) still running after TERM, force killing."
                          kill -KILL -$SPARK_SUBMIT_PID 2>/dev/null || true
                        else
                          echo "Spark job (PID: $SPARK_SUBMIT_PID) successfully stopped."
                        fi
                      else
                        echo "Could not find Spark submit PID from /tmp/spark_pid.txt"
                      fi
                  when: always

            - store_artifacts:
                  path: reports/
                  destination: validation-reports

            - run:
                  name: Cleanup containers
                  command: |
                      docker compose -f pipeline/docker-compose.yml down
                  when: always

workflows:
    validate:
        jobs:
            - validate-cdc-pipeline

This CircleCI configuration defines a job called validate-cdc-pipeline that runs the entire CDC pipeline validation process inside the CI environment. It uses a Linux machine and allocates a larger resource class to handle the workload.

The steps begin by checking out the code and installing the necessary Python dependencies. It then starts up essential services using Docker Compose, waits for them to become ready, and sets up MinIO for storing data.

The configuration proceeds to register the Postgres connector and run the Spark job in the background, allowing time for data syncing. Once the setup is complete, it executes the validator script to perform change validation tests. Afterward, it gracefully stops the Spark job, stores the generated validation report as a build artifact, and shuts down all containers to clean up resources.

Push your code to GitHub and create a project in CircleCI.

Next, trigger the pipeline manually and it should build successfully:

Successful pipeline execution

You can access the validation report on the Artifacts tab:

CDC pipeline validation report

You can access the full code for this project on GitHub.

Conclusion

In this guide, you learned how to validate a Change Data Capture (CDC) pipeline by simulating inserts, updates, and deletes, and confirming that changes are correctly reflected in your target system. You also learned how to run the validator locally and integrate it into a CI pipeline using CircleCI, transforming validation from a manual process into an automated part of your development workflow.

To build on this foundation, you can extend the validator to handle more complex scenarios, such as validating schema changes and column-level transformations. By continuously improving your validation strategy, you’ll increase confidence in your data consistency and make your CDC pipeline more robust and production-ready.