Validate CDC data in your CI/CD pipeline using CircleCI
Software Engineer
Change Data Capture (CDC) is a technique used to identify and capture changes, such as inserts, updates, and deletes, in a source database so they can be replicated to another system in real-time. This approach is crucial in modern data pipelines, especially for powering data lakes, analytics platforms, and event-driven applications that depend on up-to-date information.
Setting up a CDC pipeline is only the first step. Without proper validation, data replication issues like missing records, duplicate entries, or corrupted data can silently break downstream systems and lead to inaccurate business insights. These problems often go undetected until they’ve already caused significant damage.
This is where automated validation in your CI/CD pipeline becomes essential. In this guide, you’ll learn how to build automated CDC validation into your development workflow using CircleCI. You’ll simulate data changes in your source database and verify that those changes are correctly reflected in the destination system. By the end, you’ll have a reliable framework that continuously validates your CDC pipeline’s integrity with every code change, catching issues before they reach production.
Prerequisites
To follow along with this guide, you need to have the following:
- A CircleCI and a GitHub account
- Docker and Docker Compose installed
- Python3 installed
- MinIO client installed
- Familiarity with Apache Hudi and CDC concepts
Setting up the CDC pipeline
To keep this guide focused on CDC pipeline validation, we have prepared a starter template with a simple CDC pipeline implementation that you will write validation tests for. Clone the starter template to your local machine by executing the following command in your terminal:
git clone --single-branch -b starter-template https://github.com/CIRCLECI-GWP/circleci-cdc-pipeline-validation-demo.git
Here’s an overview of the core technologies used in the CDC pipeline:
- PostgreSQL: A popular open-source relational database that acts as the source of truth in the CDC setup.
- Debezium: An open-source CDC platform that monitors PostgreSQL for inserts, updates, and deletes, and streams those changes into Kafka.
- Apache Kafka: A distributed event streaming platform that buffers the change events captured by Debezium.
- Apache Spark: Used here to consume change events from Kafka and write them into Hudi tables.
- Apache Hudi: A data lake storage format that supports upserts and incremental pulls, making it ideal for capturing CDC events in data lakes.
- MinIO: An S3-compatible object storage used in this pipeline to store the Hudi tables.
Here is a breakdown of the key files in the project:
pipeline/docker-compose.yml: This file brings up all the necessary services to simulate a CDC environment on your local machine or in CI.pipeline/scripts: This folder holds one SQL and two shell scripts:init.sql: Creates a todos table in the database, populates it, and creates a user for Debezium with necessary privileges and a publication.register_postgres_connector.sh: Registers the Debezium PostgreSQL connector with Debeziumspark-submit.sh: Submits the Spark job that reads from Kafka and writes to Hudi in MinIO
pipeline/config-files/core-site.xml: Contains the configuration that allows Hudi to access MinIOvalidator: Contains Python scripts that you will use to implement your CDC validation logic:config.py: Configuration settingsdata_access.py: Data access layer for PostgreSQL and S3 operationsmodels.py: Data models and enumsreport_generator.py: Utilities for generating validation reportstest_data_generator.py: Utilities for generating seed data
requirements.txt: Lists all Python dependencies needed to run the validation logic
Note: You will notice that most of the credentials in this project are hardcoded as this project is for demo purposes only. You should never hardcode sensitive credentials for production applications.
You can now spin up all the pipeline components. In your terminal, execute this command:
docker compose -f pipeline/docker-compose.yml up -d
Ensure all the services are up and running by executing the command docker ps. The output should be similar to this:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e1824571b3f7 kimanikevin254/debezium-connect:3.0 "/docker-entrypoint.…" 55 seconds ago Up 5 seconds 8778/tcp, 0.0.0.0:8083->8083/tcp, [::]:8083->8083/tcp, 9092/tcp kafka-connect
0ce8772e977d kimanikevin254/spark-hudi:1.0 "/opt/bitnami/script…" 55 seconds ago Up 5 seconds 0.0.0.0:8080->8080/tcp, [::]:8080->8080/tcp spark
0af28fecc324 confluentinc/cp-kafka:7.4.0 "/etc/confluent/dock…" 55 seconds ago Up 38 seconds (healthy) 0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp kafka
fc5b0db19d25 confluentinc/cp-schema-registry:7.9.0 "/etc/confluent/dock…" 55 seconds ago Up 38 seconds 0.0.0.0:8081->8081/tcp, [::]:8081->8081/tcp schema-registry
80c329e8f858 confluentinc/cp-zookeeper:7.4.0 "/etc/confluent/dock…" 55 seconds ago Up 54 seconds (healthy) 2888/tcp, 0.0.0.0:2181->2181/tcp, [::]:2181->2181/tcp, 3888/tcp zookeeper
fd4e4c8cc5dd quay.io/minio/minio "/usr/bin/docker-ent…" 55 seconds ago Up 54 seconds 0.0.0.0:9000-9001->9000-9001/tcp, [::]:9000-9001->9000-9001/tcp minio
c64f1ef4d27b postgres:17 "docker-entrypoint.s…" 55 seconds ago Up 54 seconds 0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp cdc-postgres
Next, set up your local MinIO server and create a bucket to store your CDC data using the following commands:
mc alias set myminio http://localhost:9000 minioadmin minioadmin
mc mb myminio/cdc-bucket
The first command registers a shortcut (alias) named myminio that connects to your MinIO server running at http://localhost:9000, using the credentials defined in the pipeline/docker-compose.yml file. The second command creates a new bucket inside the server to hold your CDC data.
Register the PostgreSQL connector using the commands:
chmod +x ./pipeline/scripts/register_postgres_connector.sh
./pipeline/scripts/register_postgres_connector.sh
You should get the following output:
```json (prettified) { “name”: “postgres-connector”, “config”: { “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”, “database.hostname”: “cdc-postgres”, “database.port”: “5432”, “database.user”: “debezium_user”, “database.password”: “debezium_password”, “database.dbname”: “cdc_db”, “topic.prefix”: “postgres”, “plugin.name”: “pgoutput”, “table.include.list”: “public.todos”, “publication.name”: “debezium_pub”, “key.converter”: “io.confluent.connect.avro.AvroConverter”, “key.converter.schema.registry.url”: “http://schema-registry:8081”, “value.converter”: “io.confluent.connect.avro.AvroConverter”, “value.converter.schema.registry.url”: “http://schema-registry:8081”, “transforms”: “unwrap”, “transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”, “transforms.unwrap.drop.tombstones”: “false”, “transforms.unwrap.delete.handling.mode”: “rewrite”, “name”: “postgres-connector” }, “tasks”: [], “type”: “source” }
Finally, run the spark job that reads from Kafka and writes to the data lake:
```bash
chmod +x ./pipeline/scripts/spark-submit.sh
./pipeline/scripts/spark-submit.sh
Running baseline validations
Before applying any controlled changes to the source database, it’s important to verify that the source and destination systems are in sync after the first ingestion. These baseline validations serve as a sanity check to ensure that:
- All rows from the source database (PostgreSQL) have been successfully replicated to the destination (S3 via Hudi).
- The primary keys in both systems match exactly which confirms that no data has been lost or duplicated during the initial sync.
To implement this, create a new file named validator/validators.py and add the code below:
"""Validation logic for the CDC pipeline"""
import re
import logging
from datetime import datetime
from typing import Dict, List, Any
import pandas as pd
from models import TestResult, ValidationResult
from config import HUDI_RECORD_KEY, HUDI_DELETION_COLUMN
class DataValidator:
def __init__(self, data_access_manager, table_configs: Dict[str, Dict]):
self.data_access = data_access_manager
self.table_configs = table_configs
self.logging = logging.getLogger(__name__)
def validate_row_counts(self, table_name: str) -> TestResult:
"""Validate that row counts match between PostgreSQL and S3."""
try:
pg_data = self.data_access.get_postgres_data(table_name)
s3_data = self.data_access.get_s3_data(table_name)
# Filter out soft-deleted rows from the datalake
active_s3_data = s3_data[s3_data[HUDI_DELETION_COLUMN] != "true"]
s3_count = len(active_s3_data)
pg_count = len(pg_data)
if pg_count == s3_count:
result = ValidationResult.PASS
details = f"Row counts match: {pg_count} rows."
else:
result = ValidationResult.FAIL
details = f"Row counts do not match: PostgreSQL has {pg_count} rows, S3 has {s3_count} rows."
return TestResult(
test_name=f"row_count_validation_{table_name}",
result=result,
details=details,
timestamp=datetime.now(),
metrics={"pg_count": pg_count, "s3_count": s3_count}
)
except Exception as e:
return TestResult(
test_name=f"row_count_validation_{table_name}",
result=ValidationResult.FAIL,
details=str(e),
timestamp=datetime.now()
)
def validate_data_integrity(self, table_name: str) -> TestResult:
"""Validate data integrity by comparing primary keys."""
try:
pg_data = self.data_access.get_postgres_data(table_name)
s3_data = self.data_access.get_s3_data(table_name)
if pg_data.empty and s3_data.empty:
return TestResult(
test_name=f"data_integrity_validation_{table_name}",
result=ValidationResult.PASS,
details="Both datasets are empty.",
timestamp=datetime.now()
)
# Filter out deleted records from S3 data
s3_data = s3_data[s3_data[HUDI_DELETION_COLUMN] != "true"]
pg_pk_col = self.table_configs[table_name]['primary_key']
if pg_pk_col in pg_data.columns and HUDI_RECORD_KEY in s3_data.columns:
# Compare primary keys
pg_ids = set(pg_data[pg_pk_col].astype(str))
s3_ids = set(s3_data[HUDI_RECORD_KEY].astype(str))
missing_in_s3 = pg_ids - s3_ids
extra_in_s3 = s3_ids - pg_ids
if len(missing_in_s3) == 0 and len(extra_in_s3) == 0:
result = ValidationResult.PASS
details = "Data integrity check passed. All primary keys match."
else:
result = ValidationResult.FAIL
details = (
f"Data integrity check failed. "
f"Missing in S3: {len(missing_in_s3)} rows, "
f"Extra in S3: {len(extra_in_s3)} rows."
)
else:
result = ValidationResult.FAIL
details = f"PG primary key or Hudi record key not found in the datasets."
return TestResult(
test_name=f"data_integrity_validation_{table_name}",
result=result,
details=details,
timestamp=datetime.now()
)
except Exception as e:
return TestResult(
test_name=f"data_integrity_validation_{table_name}",
result=ValidationResult.FAIL,
details=str(e),
timestamp=datetime.now()
)
The DataValidator class provides two main methods: validate_row_counts and validate_data_integrity. The validate_row_counts method checks that the number of non-deleted rows in the S3-backed Hudi table matches the number of rows in the PostgreSQL source table, logging a pass or fail depending on whether the counts align. The validate_data_integrity method further verifies that the primary keys in both PostgreSQL and S3 match exactly, flagging any missing or extra keys in the S3 data.
Applying controlled changes to the source database
Once baseline validations pass, you can safely start applying controlled changes to the source database. These changes are intentional modifications such as inserts, updates, or soft deletions that will help verify whether your CDC pipeline is capturing and reflecting changes correctly in the destination.
This phase is crucial because it allows you to test the behavior of the pipeline in a realistic, yet controlled, environment before it handles production traffic. You’ll be able to confirm that the CDC mechanism correctly identifies changes, processes them, and writes them to the data lake in the expected format.
To implement the logic to apply controlled changes to the source database, open the validator/data_access.py file and add the method below to the DataAccessManager class:
def execute_changes(self, table_name: str, changes: List[Dict[str, Any]]):
"""
Make controlled changes to the PostgreSQL database.
Args:
table_name (str): The name of the table to modify.
changes (List[Dict[str, Any]]): A list of dictionaries representing the changes to apply.
[
{"operation": "insert", "data": {...}},
{"operation": "update", "data": {...}, "where_column": "id", "where_value": 1},
{"operation": "delete", "where_column": "id", "where_value": 2}
]
"""
with self.pg_conn.cursor() as cursor:
for change in changes:
sql = None
params = []
try:
operation = change['operation']
if operation == 'insert':
columns = list(change['data'].keys())
values_placeholders = ', '.join(['%s'] * len(columns))
sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({values_placeholders})"
params = list(change['data'].values())
elif operation == 'update':
set_clauses = []
for k, v in change['data'].items():
set_clauses.append(f"{k} = %s")
params.append(v)
set_clause_str = ', '.join(set_clauses)
where_column = change['where_column']
where_value = change['where_value']
sql = f"UPDATE {table_name} SET {set_clause_str} WHERE {where_column} = %s"
params.append(where_value)
elif operation == 'delete':
where_column = change['where_column']
where_value = change['where_value']
sql = f"DELETE FROM {table_name} WHERE {where_column} = %s"
params.append(where_value)
else:
self.logging.warning(f"Unsupported operation: {operation}. Skipping change.")
continue
if sql:
cursor.execute(sql, params)
self.logging.info(f"Executed SQL (template): {sql} with parameters: {params}")
except Exception as e:
self.logging.error(f"Error executing change for operation '{operation}': {e}")
self.pg_conn.rollback()
raise
self.pg_conn.commit()
return
The execute_changes method applies a list of controlled changes to the specified PostgreSQL table. Each change is described as a dictionary that defines the operation type and the data or conditions involved. The method dynamically builds the appropriate SQL for each operation and executes it using a database cursor. It ensures proper logging, handles exceptions gracefully, and commits the changes once all operations are processed.
Verifying changes in the data lake
Once controlled changes have been applied to the source PostgreSQL database, it’s important to verify that these changes have been accurately captured and reflected in the data lake. This ensures the CDC pipeline is functioning as expected.
To do this, open the validator/validators.py file and add the following methods to the DataValidator class:
def _extract_pk_from_where_clause(self, where_clause: str, pk_col: str) -> Any:
"""Extract primary key value from WHERE clause."""
try:
# Handle common WHERE clause formats
# "id = 1", "id=1", "id = '1'", etc.
pattern = rf"{pk_col}\s*=\s*['\"]?([^'\"\s]+)['\"]?"
match = re.search(pattern, where_clause, re.IGNORECASE)
if match:
value = match.group(1)
# Try to convert to int if possible
try:
return int(value)
except ValueError:
return value
return None
except Exception:
return None
def validate_changes_propagated(self, table_name: str, expected_changes: List[Dict[str, Any]]) -> TestResult:
"""Validate that controlled changes are reflected in the data lake."""
try:
s3_data = self.data_access.get_s3_data(table_name)
pk_col = self.table_configs[table_name]['primary_key']
changes_validated = 0
failed_validations = []
for change in expected_changes:
if change['operation'] == 'insert':
# Check if the inserted record exists in the datalake
inserted_data = change['data']
# Find matching rows by filtering on all inserted fields (excluding the PK)
matching_condition = True
filter_fields = {}
for key, expected_value in inserted_data.items():
if key != pk_col and key in s3_data.columns:
filter_fields[key] = expected_value
matching_condition = matching_condition & (s3_data[key].astype(str) == str(expected_value))
# Only proceed if we have fields to filter on
if filter_fields:
matching_rows = s3_data[matching_condition]
if len(matching_rows) > 0:
active_rows = matching_rows[matching_rows[HUDI_DELETION_COLUMN] != "true"]
if len(active_rows) > 0:
changes_validated += 1
self.logging.info(f"✓ Insert validated - found {len(active_rows)} matching active record(s)")
else:
failed_validations.append(f"Insert found but all matching records are marked as deleted: {filter_fields}")
else:
failed_validations.append(f"Insert not found with matching fields: {filter_fields}")
else:
failed_validations.append(f"No valid fields to match for insert validation: {inserted_data}")
elif change['operation'] == 'update':
# Check if the updated record reflects the changes
updated_data = change['data']
where_clause = f"{change['where_column']} = {change['where_value']}"
# Extract ID from where clause
pk_value = self._extract_pk_from_where_clause(where_clause, pk_col)
if pk_value:
pk_value_str = str(pk_value)
matching_rows = s3_data[s3_data[HUDI_RECORD_KEY].astype(str) == pk_value_str]
if len(matching_rows) > 0:
row = matching_rows.iloc[0]
# Check if it's not marked as deleted
if HUDI_DELETION_COLUMN in row and row[HUDI_DELETION_COLUMN] == "true":
failed_validations.append(f"Update target found but marked as deleted: ID {pk_value}")
else:
# Validate updated fields
field_mismatches = []
for key, expected_value in updated_data.items():
if key in row.index:
actual_value = row[key]
# Convert for comparison
if str(actual_value) != str(expected_value):
field_mismatches.append(f"{key}: expected {expected_value}, got {actual_value}")
else:
field_mismatches.append(f"{key}: field not found in S3 data")
if field_mismatches:
failed_validations.append(f"Update validation failed for ID {pk_value}: {field_mismatches}")
else:
changes_validated += 1
self.logging.info(f"✓ Update validated for ID: {pk_value}")
else:
failed_validations.append(f"Update target not found: {where_clause}")
else:
failed_validations.append(f"Could not extract primary key from where clause: {where_clause}")
elif change['operation'] == 'delete':
# Check if the deleted record is marked as deleted in the datalake
where_clause = f"{change['where_column']} = {change['where_value']}"
# Extract ID from where clause
pk_value = self._extract_pk_from_where_clause(where_clause, pk_col)
if pk_value:
pk_value_str = str(pk_value)
matching_rows = s3_data[s3_data[HUDI_RECORD_KEY].astype(str) == pk_value_str]
if len(matching_rows) > 0:
row = matching_rows.iloc[0]
# Check if it's marked as deleted
if HUDI_DELETION_COLUMN in row and row[HUDI_DELETION_COLUMN] == "true":
changes_validated += 1
self.logging.info(f"✓ Delete validated for ID: {pk_value}")
else:
failed_validations.append(f"Delete target found but not marked as deleted: ID {pk_value}")
else:
failed_validations.append(f"Delete target not found. ID {pk_value}")
else:
failed_validations.append(f"Could not extract primary key from where clause: {where_clause}")
if len(failed_validations) == 0:
result = ValidationResult.PASS
details = f"All {changes_validated} changes validated successfully"
else:
result = ValidationResult.FAIL
details = f"Failed validations: {failed_validations}"
return TestResult(
test_name=f"change_propagation_validation_{table_name}",
result=result,
details=details,
timestamp=datetime.now(),
metrics={"validated_changes": changes_validated, "failed_changes": len(failed_validations)}
)
except Exception as e:
self.logging.error('An error occured', e)
return TestResult(
test_name=f"change_propagation_validation_{table_name}",
result=ValidationResult.FAIL,
details=f"Error during change validation: {str(e)}",
timestamp=datetime.now()
)
This code defines a method validate_changes_propagated that ensures that controlled changes (inserts, updates, and deletes) applied to the PostgreSQL source database have been correctly reflected in the data lake. It accepts a list of expected changes and checks each one against the current data in the data lake.
- For inserts, it verifies that the inserted record exists and is not marked as deleted.
- For updates, it compares the updated fields with the corresponding record in the data lake using the primary key extracted from the
WHEREclause. - For deletes, it checks whether the target record exists and is marked as deleted.
A helper method _extract_pk_from_where_clause is used to extract the primary key from SQL-like WHERE clauses. If all changes are correctly validated, it returns a passing TestResult. Otherwise, it returns detailed failure information for each unsuccessful validation.
Orchestrating the full validation workflow
Once the individual validation components are in place, the next step is to tie them together into a complete validation process. This involves creating a validation class that wraps all core testing logic, along with a main script that coordinates when and how these methods are executed.
Start by creating a new file named validator/cdc_pipeline_validator.py and adding the code below:
"""Main validator orchestrator"""
import json
import logging
import time
from datetime import datetime
from typing import Dict, List, Any
from config import PIPELINE_SYNC_WAIT_TIME_SECS
from models import TestResult, ValidationResult
from data_access import DataAccessManager
from test_data_generator import TestDataGenerator
from validators import DataValidator
from report_generator import ReportGenerator
class CDCPipelineValidator:
def __init__(self, postgres_config: Dict[str, Any], s3_config: Dict[str, Any], table_configs: List[Dict[str, Any]]):
self.table_configs = {config['table_name']: config for config in table_configs}
self.test_results: List[TestResult] = []
# Set up logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Initialize components
self.data_access = DataAccessManager(postgres_config, s3_config)
self.test_data_generator = TestDataGenerator()
self.validator = DataValidator(self.data_access, self.table_configs)
self.report_generator = ReportGenerator()
def run_baseline_validation(self) -> List[TestResult]:
"""Run baseline validation for all configured tables."""
self.logger.info("Starting baseline validation...")
results = []
for table_name in self.table_configs.keys():
self.logger.info(f"Validating table: {table_name}")
# Row Count Validation
row_count_result = self.validator.validate_row_counts(table_name)
results.append(row_count_result)
# Data Integrity Validation
data_integrity_result = self.validator.validate_data_integrity(table_name)
results.append(data_integrity_result)
self.test_results.extend(results)
return results
def run_change_validation_test(self, table_name: str, test_changes: List[Dict[str, Any]]) -> List[TestResult]:
"""Run complete change validation test."""
results = []
try:
# Make controlled changes
self.logger.info(f"Creating test changes for table: {table_name}")
self.data_access.execute_changes(table_name, test_changes)
# Wait for pipeline sync
self.logger.info(f"Pausing for {PIPELINE_SYNC_WAIT_TIME_SECS} seconds for data to sync...")
time.sleep(PIPELINE_SYNC_WAIT_TIME_SECS)
# Validate changes propagated
change_result = self.validator.validate_changes_propagated(table_name, test_changes)
results.append(change_result)
self.logger.info(f"Change validation completed for {table_name}")
except Exception as e:
error_result = TestResult(
test_name=f"change_validation_test_{table_name}",
result=ValidationResult.FAIL,
details=f"Test failed with error: {str(e)}",
timestamp=datetime.now()
)
results.append(error_result)
self.test_results.extend(results)
return results
def generate_test_changes(self, table_name: str) -> List[Dict[str, Any]]:
"""Generate test changes for a table."""
test_changes = []
primary_key = self.table_configs[table_name]['primary_key']
# Generate an insert change
insert_change = self.test_data_generator.generate_test_data(table_name, 'insert')
if insert_change:
test_changes.append(insert_change)
# Get existing data for update/delete operations
existing_data = self.data_access.get_postgres_data(table_name)
if not existing_data.empty:
# Generate an update change
update_change = self.test_data_generator.generate_test_data(
table_name, 'update', existing_data, primary_key
)
if update_change:
test_changes.append(update_change)
# Generate a delete change
delete_change = self.test_data_generator.generate_test_data(
table_name, 'delete', existing_data, primary_key
)
if delete_change:
test_changes.append(delete_change)
return test_changes
def generate_report(self) -> str:
"""Generate a comprehensive test report."""
return self.report_generator.generate_report(self.test_results)
def save_report(self, filename: str = None) -> str:
"""Save report to file."""
report = self.generate_report()
return self.report_generator.save_report(report, filename)
def cleanup(self):
"""Clean up connections."""
self.data_access.close_pg_connection()
self.logger.info("Connections closed")
This code defines the CDCPipelineValidator class, which serves as the central point for running validations. It initializes all required components, including the data access layer, test data generator, validators, and report generator.
The class provides methods responsible for executing baseline validations, generating and applying test changes, verifying that changes are correctly propagated, compiling the results into a report, and cleaning up any open connections when the process is complete.
Next, create the main orchestration script in validator/main.py and add the code below:
"""Main execution script"""
import json
from config import POSTGRES_CONFIG, S3_CONFIG, TABLE_CONFIGS
from cdc_pipeline_validator import CDCPipelineValidator
def main():
# Initialize validator
validator = CDCPipelineValidator(POSTGRES_CONFIG, S3_CONFIG, TABLE_CONFIGS)
try:
# Run baseline validation
validator.run_baseline_validation()
# Run change validation tests for each table
for table_name in validator.table_configs.keys():
print(f"\n--- Running change validation tests for table: {table_name} ---")
# Generate test changes
test_changes = validator.generate_test_changes(table_name)
if test_changes:
print(f"Running change validation test for {table_name}...")
print(f"Generated data: {json.dumps(test_changes, indent=2)}")
validator.run_change_validation_test(table_name, test_changes)
else:
print(f"No test changes generated for {table_name}. Skipping change validation.")
# Generate and save report
print("\nGenerating final report...")
filename = validator.save_report()
print(f"Report saved to {filename}")
finally:
validator.cleanup()
if __name__ == "__main__":
main()
This script acts as the entry point for the entire validation process. It begins by initializing an instance of the CDCPipelineValidator class with the necessary configuration for PostgreSQL, S3, and the list of tables to test. It runs the baseline validations to ensure the pipeline is in a healthy starting state, then loops through each table to generate and apply test changes.
If any changes are created, it runs the change validation logic to confirm the pipeline correctly replicates those changes. Once all validations are complete, the script generates a report and saves it to a file. Finally, it ensures all open resources are closed properly, regardless of whether the tests passed or failed.
Running the validation locally
With the full validation workflow in place, you can now run everything locally to verify your CDC pipeline end-to-end. This makes it easy to debug issues before integrating into any CI/CD system.
First, create a virtual environment, activate it, and install the required dependencies:
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Run the validator’s main execution script located in validator/main.py by executing the following command:
python3 validator/main.py
Your output should be similar to the following:
INFO:data_access:PostgreSQL connection established.
INFO:data_access:S3 connection established.
INFO:cdc_pipeline_validator:Starting baseline validation...
INFO:cdc_pipeline_validator:Validating table: todos
--- Running change validation tests for table: todos ---
Running change validation test for todos...
Generated data: [
{
"operation": "insert",
"data": {
"title": "Similar time good decade.",
"description": "Trial bit read quickly wish. Early probably our example rise tough analysis.",
"completed": false
}
},
{
"operation": "update",
"data": {
"description": "Beautiful teach high."
},
"where_column": "id",
"where_value": 5
},
{
"operation": "delete",
"where_column": "id",
"where_value": 6
}
]
INFO:cdc_pipeline_validator:Creating test changes for table: todos
INFO:data_access:Executed SQL (template): INSERT INTO todos (title, description, completed) VALUES (%s, %s, %s) with parameters: ['Similar time good decade.', 'Trial bit read quickly wish. Early probably our example rise tough analysis.', False]
INFO:data_access:Executed SQL (template): UPDATE todos SET description = %s WHERE id = %s with parameters: ['Beautiful teach high.', 5]
INFO:data_access:Executed SQL (template): DELETE FROM todos WHERE id = %s with parameters: [6]
INFO:cdc_pipeline_validator:Pausing for 20 seconds for data to sync...
INFO:validators:✓ Insert validated - found 1 matching active record(s)
INFO:validators:✓ Update validated for ID: 5
INFO:validators:✓ Delete validated for ID: 6
INFO:cdc_pipeline_validator:Change validation completed for todos
Generating final report...
Report saved to reports/validation_report_20250618_171219.txt
INFO:data_access:PostgreSQL connection closed.
INFO:cdc_pipeline_validator:Connections closed
This output shows you that the validation framework successfully connected to both PostgreSQL and S3, started the baseline checks, and then ran a full change validation test on the todos table. It created and applied an insert, update, and delete operation as test changes, waited for the CDC pipeline to sync the data, and then confirmed that each change appeared as expected in the destination.
The insert, update, and delete are all validated correctly, and a final report is generated and saved. Once everything is completed, all connections are cleanly closed.
You should also get a report in the reports folder with content similar to the following that shows you that all tests passed successfully:
CDC PIPELINE VALIDATION REPORT
===========================
Generated: 2025-06-18 17:12:19.906378
SUMMARY:
--------
Total Tests: 3
Passed: 3
Failed: 0
Success Rate: 100.0%
DETAILED RESULTS:
-----------------
Test: row_count_validation_todos
Status: PASS
Details: Row counts match: 5 rows.
Timestamp: 2025-06-18 17:11:59.811965
Metrics: {'pg_count': 5, 's3_count': 5}
====================================================================================================
Test: data_integrity_validation_todos
Status: PASS
Details: Data integrity check passed. All primary keys match.
Timestamp: 2025-06-18 17:11:59.834062
Metrics: N/A
====================================================================================================
Test: change_propagation_validation_todos
Status: PASS
Details: All 3 changes validated successfully
Timestamp: 2025-06-18 17:12:19.906206
Metrics: {'validated_changes': 3, 'failed_changes': 0}
====================================================================================================
Running the CDC pipeline validation in CI
Now that you can run the validation process locally, the next step is to automate it in your continuous integration (CI) pipeline. This ensures your CDC pipeline remains reliable and helps catch regressions early. In this section, you’ll configure CircleCI to run the full validation workflow automatically whenever changes are made to your data pipeline or validation logic.
Start by creating a new file named .circleci/config.yml in the root of your project and adding the following code:
version: 2.1
jobs:
validate-cdc-pipeline:
machine:
image: ubuntu-2204:current
resource_class: large
steps:
- checkout
- run:
name: Install Python dependencies
command: |
python3 -m pip install --upgrade pip
pip install -r requirements.txt
- run:
name: Start containers
command: docker compose -f pipeline/docker-compose.yml up -d
- run:
name: Wait for services to be ready
command: sleep 30
- run:
name: Install, configure MinIO client, and create bucket
command: |
curl https://dl.min.io/client/mc/release/linux-amd64/mc \
--create-dirs \
-o $HOME/minio-binaries/mc
chmod +x $HOME/minio-binaries/mc
export PATH=$PATH:$HOME/minio-binaries/
mc alias set myminio http://localhost:9000 minioadmin minioadmin
mc mb myminio/cdc-bucket
- run:
name: Register Postgres connector
command: |
chmod +x ./pipeline/scripts/register_postgres_connector.sh
./pipeline/scripts/register_postgres_connector.sh
- run:
name: Run Spark job
command: |
chmod +x ./pipeline/scripts/spark-submit.sh
./pipeline/scripts/spark-submit.sh &
echo $! > /tmp/spark_pid.txt # Store the PID of the background process
background: true
- run:
name: Wait for the initial data to be synced to the datalake
command: sleep 30
- run:
name: Run validator
command: |
python3 validator/main.py
- run:
name: Stop Spark job gracefully
command: |
SPARK_SUBMIT_PID=$(cat /tmp/spark_pid.txt)
if [ -n "$SPARK_SUBMIT_PID" ]; then
echo "Attempting to stop Spark job launched with PID: $SPARK_SUBMIT_PID"
# Kill the process group to ensure all child processes are targeted
kill -TERM -$SPARK_SUBMIT_PID 2>/dev/null || true
sleep 10 # Give it time to shut down
# Check if it's still running
if ps -p $SPARK_SUBMIT_PID > /dev/null; then
echo "Spark job (PID: $SPARK_SUBMIT_PID) still running after TERM, force killing."
kill -KILL -$SPARK_SUBMIT_PID 2>/dev/null || true
else
echo "Spark job (PID: $SPARK_SUBMIT_PID) successfully stopped."
fi
else
echo "Could not find Spark submit PID from /tmp/spark_pid.txt"
fi
when: always
- store_artifacts:
path: reports/
destination: validation-reports
- run:
name: Cleanup containers
command: |
docker compose -f pipeline/docker-compose.yml down
when: always
workflows:
validate:
jobs:
- validate-cdc-pipeline
This CircleCI configuration defines a job called validate-cdc-pipeline that runs the entire CDC pipeline validation process inside the CI environment. It uses a Linux machine and allocates a larger resource class to handle the workload.
The steps begin by checking out the code and installing the necessary Python dependencies. It then starts up essential services using Docker Compose, waits for them to become ready, and sets up MinIO for storing data.
The configuration proceeds to register the Postgres connector and run the Spark job in the background, allowing time for data syncing. Once the setup is complete, it executes the validator script to perform change validation tests. Afterward, it gracefully stops the Spark job, stores the generated validation report as a build artifact, and shuts down all containers to clean up resources.
Push your code to GitHub and create a project in CircleCI.
Next, trigger the pipeline manually and it should build successfully:

You can access the validation report on the Artifacts tab:

You can access the full code for this project on GitHub.
Conclusion
In this guide, you learned how to validate a Change Data Capture (CDC) pipeline by simulating inserts, updates, and deletes, and confirming that changes are correctly reflected in your target system. You also learned how to run the validator locally and integrate it into a CI pipeline using CircleCI, transforming validation from a manual process into an automated part of your development workflow.
To build on this foundation, you can extend the validator to handle more complex scenarios, such as validating schema changes and column-level transformations. By continuously improving your validation strategy, you’ll increase confidence in your data consistency and make your CDC pipeline more robust and production-ready.