Hands-On Lab: Kafka + Debezium + Postgres (with Sinks)
Spin up a fully working CDC stack with Docker Compose, configure Debezium source and JDBC sink connectors, and watch real-time changes flow end to end—from source database through Kafka to destination. No prior streaming ops experience required.
In this lab, you'll build a complete, end-to-end Change Data Capture (CDC) pipeline from scratch. We'll use Docker Compose to orchestrate Postgres databases (source and sink), Kafka, the Debezium Postgres connector, and a JDBC Sink connector to demonstrate real-time change streaming and materialization.
🚀 Quick Start Option: Use the CDC Sandbox
Want to skip the manual setup and start learning immediately? This repository includes a pre-configured CDC sandbox with all services ready to go:
- ✅ Pre-loaded with sample data (products, customers, orders)
- ✅ Postgres and MySQL databases configured for CDC
- ✅ Kafka, Zookeeper, and Debezium Connect ready
- ✅ Kafka UI for visual message inspection
- ✅ One-command startup:
docker compose up -d
Get started:
# From the repository root
docker compose up -d
./sandbox/register-postgres-connector.sh
# Open Kafka UI
# macOS
open http://localhost:8080
# Linux
xdg-open http://localhost:8080
# Windows (PowerShell)
start http://localhost:8080
📖 Full documentation: CDC Sandbox Guide
If you prefer to understand the setup step-by-step, continue with the instructions below. Otherwise, skip to Verify Your Sink after starting the sandbox.
Prerequisites
- Docker and Docker Compose: Ensure they are installed and running on your machine.
- A terminal or command prompt: Basic familiarity with shell commands.
- cURL or a similar tool: For interacting with the Kafka Connect REST API.
Lab Setup
-
Create Project Directory
Start by creating a new folder for your lab project and navigate into it.
mkdir cdc-lab && cd cdc-lab -
Create `docker-compose.yml`
Create a file named
docker-compose.ymland paste the following configuration. This file defines all the services we need: Zookeeper, Kafka, Postgres, and Debezium/Connect.# docker-compose.yml version: '3.7' services: zookeeper: image: confluentinc/cp-zookeeper:7.7.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:7.7.0 hostname: kafka container_name: kafka ports: - "9092:9092" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 postgres: image: debezium/postgres:15 hostname: postgres container_name: postgres ports: - "5432:5432" environment: - POSTGRES_USER=start_data_engineer - POSTGRES_PASSWORD=password - POSTGRES_DB=inventory connect: image: debezium/connect:2.7 hostname: connect container_name: connect ports: - "8083:8083" depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: 'kafka:29092' GROUP_ID: 1 CONFIG_STORAGE_TOPIC: connect-configs OFFSET_STORAGE_TOPIC: connect-offsets STATUS_STORAGE_TOPIC: connect-status postgres-sink: image: debezium/postgres:15 hostname: postgres-sink container_name: postgres-sink ports: - "5433:5432" environment: - POSTGRES_USER=sink_user - POSTGRES_PASSWORD=password - POSTGRES_DB=warehouseSecurity Note: This lab uses hardcoded credentials for simplicity. In production environments, use environment variables, Docker secrets, or a secrets management solution like HashiCorp Vault to protect sensitive information.
Configuration Note: The Connect worker uses
connect-configs,connect-offsets, andconnect-statusas Kafka topic names for storing connector configurations, offsets, and status. These are the default topic names used by Debezium Connect. When backing up or inspecting offsets for operational purposes, you'll reference theconnect-offsetstopic. -
Start the Services
Open your terminal in the project directory and run the following command to start all the containers.
docker-compose up -dIt might take a few minutes for all services to become fully operational.
-
Register the Postgres Connector
Once the services are running, we need to tell Debezium to start monitoring our Postgres database. We do this by posting a JSON configuration to the Kafka Connect REST API. Create a file named
register-postgres.json:{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "start_data_engineer", "database.password": "password", "database.dbname": "inventory", "topic.prefix": "dbserver1", "table.include.list": "public.products", "plugin.name": "pgoutput" } }Debezium 2.x uses
topic.prefix; for 1.x usedatabase.server.name.Now, post this configuration using cURL:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgres.jsonYou should receive an HTTP 201 response, indicating success.
-
Create a Table and Insert Data
Let's connect to the Postgres database and create the `products` table that our connector is configured to watch.
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "CREATE TABLE products (id SERIAL PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT);"Now, insert a row:
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "INSERT INTO products (name, description, weight) VALUES ('Laptop', 'A powerful laptop', 1.5);" -
Observe the Change Event in Kafka
Let's consume from the Kafka topic to see the change event that Debezium generated. The topic name is based on the `topic.prefix` and the table name (`dbserver1.public.products`).
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic dbserver1.public.products --from-beginning --max-messages 1 --property print.key=trueYou will see a detailed JSON payload representing the `INSERT` operation you just performed. Press
Ctrl+Cif you want to see more messages, or use--timeout-ms 10000instead of--max-messages 1to view all messages within a time window. -
Configure the JDBC Sink Connector
Now we'll add a downstream sink to write these changes into another Postgres database (our "data warehouse"). First, create the destination table in the sink database:
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "CREATE TABLE products (id INTEGER PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT);"Note: The sink table schema must match the source table columns that will be replicated. Alternatively, you can set
auto.create=truein the connector config to let it create the table automatically, though manual creation gives you more control over data types and constraints.Create a file named
register-sink.jsonwith the JDBC Sink Connector configuration:{ "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "dbserver1.public.products", "connection.url": "jdbc:postgresql://postgres-sink:5432/warehouse", "connection.user": "sink_user", "connection.password": "password", "auto.create": "false", "auto.evolve": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id", "table.name.format": "products", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite" } }Register the sink connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-sink.jsonNote: The sink connector uses Debezium's
ExtractNewRecordStatetransform to unwrap the event envelope and extract just theafterstate, making it suitable for simple upserts.Production Tip: For production deployments, externalize credentials using environment variables or secure credential stores instead of hardcoding them in the connector configuration.
-
Verify Data Replication
Check that the data has been replicated to the sink database:
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products;"You should see the same laptop record that was inserted in the source database. The JDBC sink connector consumed the change event from Kafka and inserted it into the destination table.
-
Test the Upsert Pattern
Let's update the product in the source database to see how the sink handles updates:
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "UPDATE products SET weight = 1.8, description = 'A powerful gaming laptop' WHERE id = 1;"Wait a moment for the change to propagate, then verify the sink reflects the update:
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products WHERE id = 1;"The sink should show the updated values. Because we configured the sink with
insert.mode=upsert, it performs an UPDATE if the record exists, or an INSERT if it doesn't—implementing a MERGE pattern automatically. -
Test Schema Evolution
Let's add a new column to the source table and see how the pipeline handles schema changes:
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "ALTER TABLE products ADD COLUMN price DECIMAL(10,2);"Insert a new product with the price column:
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "INSERT INTO products (name, description, weight, price) VALUES ('Monitor', '27-inch 4K display', 5.5, 499.99);"The sink connector's
auto.evolve=truesetting will automatically add the new column to the sink table. Verify the schema evolution:docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "\d products"Then check that both products are present:
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products;"You should see the
pricecolumn has been added, and both products are present—demonstrating seamless schema evolution! -
Validate End-to-End Consistency
Finally, let's verify that the source and sink databases are in sync. First, check the source database:
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "SELECT COUNT(*), MAX(id), MIN(id) FROM products;" docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "SELECT * FROM products ORDER BY id;"Then check the sink database:
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT COUNT(*), MAX(id), MIN(id) FROM products;" docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products ORDER BY id;"Both databases should show the same counts and data. Congratulations! You've built a complete end-to-end CDC pipeline with:
- ✅ Real-time change capture from a source database
- ✅ Event streaming through Kafka
- ✅ Automatic materialization in a sink database
- ✅ Upsert pattern handling for updates
- ✅ Schema evolution support
Verify Your Sink
Beyond basic visual inspection, it's essential to validate that your CDC pipeline maintains perfect parity between source and sink. Here are SQL verification snippets that confirm end-to-end consistency:
1. Row Count Check
Verify that both databases have the same number of records:
-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
"SELECT COUNT(*) as source_count FROM products;"
-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
"SELECT COUNT(*) as sink_count FROM products;"
Expected output: Both queries should return the same count. For example:
source_count
--------------
2
(1 row)
sink_count
------------
2
(1 row)
If counts differ, investigate missing or duplicate records by comparing primary keys.
2. Primary Key Diff Check
Identify records that exist in one database but not the other:
-- Find IDs in source but missing from sink
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
"COPY (SELECT id FROM products) TO STDOUT;" > /tmp/source_ids.txt
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
"COPY (SELECT id FROM products) TO STDOUT;" > /tmp/sink_ids.txt
# Compare the files
comm -23 <(sort /tmp/source_ids.txt) <(sort /tmp/sink_ids.txt)
# Output shows IDs in source but not in sink
comm -13 <(sort /tmp/source_ids.txt) <(sort /tmp/sink_ids.txt)
# Output shows IDs in sink but not in source
Expected output: Both commands should produce no output, indicating perfect parity:
# No missing records in either direction
(empty output)
Alternative SQL-only approach: If you have a shared database or can use foreign data wrappers:
-- This example assumes you can query both databases simultaneously
-- (requires dblink or similar cross-database query capability)
-- Records in source not in sink
SELECT s.id
FROM source_schema.products s
LEFT JOIN sink_schema.products k ON s.id = k.id
WHERE k.id IS NULL;
-- Records in sink not in source
SELECT k.id
FROM sink_schema.products k
LEFT JOIN source_schema.products s ON k.id = s.id
WHERE s.id IS NULL;
3. MD5 Hash Parity Check
Verify that record contents are identical by comparing hash values. This catches subtle data inconsistencies like truncation or type conversion errors:
-- Source database: compute hash of all column values
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
"SELECT id, md5(CAST((id, name, description, weight, price) AS text)) as content_hash
FROM products
ORDER BY id;"
-- Sink database: compute same hash
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
"SELECT id, md5(CAST((id, name, description, weight, price) AS text)) as content_hash
FROM products
ORDER BY id;"
Note: Adjust the column list (id, name, description, weight, price) to match your actual table schema. Include all columns that should be compared for consistency.
Expected output: Hash values should match for each ID:
id | content_hash
----+----------------------------------
1 | 5f7c3d8e9a2b1c4d6e8f0a1b2c3d4e5f
2 | 8a9b0c1d2e3f4g5h6i7j8k9l0m1n2o3p
(2 rows)
Any hash mismatch indicates a data inconsistency that requires investigation.
4. Aggregate Consistency Check
Compare aggregate statistics to catch subtle data drift:
-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
"SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT id) as unique_ids,
SUM(weight) as total_weight,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price
FROM products;"
-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
"SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT id) as unique_ids,
SUM(weight) as total_weight,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price
FROM products;"
Expected output: All aggregates should match exactly:
total_rows | unique_ids | total_weight | avg_price | min_price | max_price
------------+------------+--------------+---------------------+-----------+-----------
2 | 2 | 7.3 | 499.9900000000000000| 499.99 | 499.99
(1 row)
Discrepancies in aggregates often indicate missing records, duplicate inserts, or failed updates.
5. Null Value Consistency
Ensure NULL handling is consistent across source and sink:
-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
"SELECT
COUNT(*) FILTER (WHERE price IS NULL) as null_prices,
COUNT(*) FILTER (WHERE description IS NULL) as null_descriptions
FROM products;"
-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
"SELECT
COUNT(*) FILTER (WHERE price IS NULL) as null_prices,
COUNT(*) FILTER (WHERE description IS NULL) as null_descriptions
FROM products;"
Expected output: NULL counts should be identical:
null_prices | null_descriptions
-------------+-------------------
1 | 0
(1 row)
💡 Verification Best Practices
- Automate checks: Run these queries in a monitoring script or CI/CD pipeline
- Schedule regular validation: Don't wait for user complaints—proactively verify parity
- Test with deletes: Ensure DELETE operations propagate correctly (requires additional sink configuration)
- Monitor lag: Use Kafka consumer lag metrics to detect replication delays
- Handle schema evolution: When columns are added or removed, update your verification queries
⚠️ Common Validation Pitfalls
- Timestamp precision: Timestamps may lose microsecond precision depending on database and connector settings
- Floating-point rounding: DECIMAL/NUMERIC types may round differently; use epsilon comparisons for floating-point values
- Character encoding: Ensure both databases use the same encoding (UTF-8 recommended)
- Time zones: Timestamps should be stored in UTC to avoid daylight saving time issues
- Default values: Check if sink applies different defaults than source for NULL columns
Cleanup
When you're done experimenting, you can tear down all the services:
docker-compose down -v
The -v flag removes the volumes, ensuring a clean slate for next time.
Next Steps
Now that you understand the complete CDC flow, consider exploring:
- Materialization patterns for handling deletes and late-arriving data
- Schema evolution strategies for production environments
- Exactly-once semantics for critical pipelines
- Observability best practices for monitoring your CDC pipeline