Intermediate

Hands-On Lab: Kafka + Debezium + Postgres (with Sinks)

Spin up a fully working CDC stack with Docker Compose, configure Debezium source and JDBC sink connectors, and watch real-time changes flow end to end—from source database through Kafka to destination. No prior streaming ops experience required.

Tested with: Kafka , Kafka Connect , Debezium , PostgreSQL

In this lab, you'll build a complete, end-to-end Change Data Capture (CDC) pipeline from scratch. We'll use Docker Compose to orchestrate Postgres databases (source and sink), Kafka, the Debezium Postgres connector, and a JDBC Sink connector to demonstrate real-time change streaming and materialization.

🚀 Quick Start Option: Use the CDC Sandbox

Want to skip the manual setup and start learning immediately? This repository includes a pre-configured CDC sandbox with all services ready to go:

Get started:

# From the repository root
  docker compose up -d
  ./sandbox/register-postgres-connector.sh

  # Open Kafka UI
  # macOS
  open http://localhost:8080
  # Linux
  xdg-open http://localhost:8080
  # Windows (PowerShell)
  start http://localhost:8080

📖 Full documentation: CDC Sandbox Guide

If you prefer to understand the setup step-by-step, continue with the instructions below. Otherwise, skip to Verify Your Sink after starting the sandbox.

Prerequisites

Lab Setup

  1. Create Project Directory

    Start by creating a new folder for your lab project and navigate into it.

    mkdir cdc-lab && cd cdc-lab
  2. Create `docker-compose.yml`

    Create a file named docker-compose.yml and paste the following configuration. This file defines all the services we need: Zookeeper, Kafka, Postgres, and Debezium/Connect.

    # docker-compose.yml
    version: '3.7'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:7.7.0
        hostname: zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: confluentinc/cp-kafka:7.7.0
        hostname: kafka
        container_name: kafka
        ports:
          - "9092:9092"
        depends_on:
          - zookeeper
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    
      postgres:
        image: debezium/postgres:15
        hostname: postgres
        container_name: postgres
        ports:
          - "5432:5432"
        environment:
          - POSTGRES_USER=start_data_engineer
          - POSTGRES_PASSWORD=password
          - POSTGRES_DB=inventory
    
      connect:
        image: debezium/connect:2.7
        hostname: connect
        container_name: connect
        ports:
          - "8083:8083"
        depends_on:
          - kafka
          - postgres
        environment:
          BOOTSTRAP_SERVERS: 'kafka:29092'
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: connect-configs
          OFFSET_STORAGE_TOPIC: connect-offsets
          STATUS_STORAGE_TOPIC: connect-status
    
      postgres-sink:
        image: debezium/postgres:15
        hostname: postgres-sink
        container_name: postgres-sink
        ports:
          - "5433:5432"
        environment:
          - POSTGRES_USER=sink_user
          - POSTGRES_PASSWORD=password
          - POSTGRES_DB=warehouse
                        

    Security Note: This lab uses hardcoded credentials for simplicity. In production environments, use environment variables, Docker secrets, or a secrets management solution like HashiCorp Vault to protect sensitive information.

    Configuration Note: The Connect worker uses connect-configs, connect-offsets, and connect-status as Kafka topic names for storing connector configurations, offsets, and status. These are the default topic names used by Debezium Connect. When backing up or inspecting offsets for operational purposes, you'll reference the connect-offsets topic.

  3. Start the Services

    Open your terminal in the project directory and run the following command to start all the containers.

    docker-compose up -d

    It might take a few minutes for all services to become fully operational.

  4. Register the Postgres Connector

    Once the services are running, we need to tell Debezium to start monitoring our Postgres database. We do this by posting a JSON configuration to the Kafka Connect REST API. Create a file named register-postgres.json:

    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "start_data_engineer",
        "database.password": "password",
        "database.dbname": "inventory",
        "topic.prefix": "dbserver1",
        "table.include.list": "public.products",
        "plugin.name": "pgoutput"
      }
    }

    Debezium 2.x uses topic.prefix; for 1.x use database.server.name.

    Now, post this configuration using cURL:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgres.json

    You should receive an HTTP 201 response, indicating success.

  5. Create a Table and Insert Data

    Let's connect to the Postgres database and create the `products` table that our connector is configured to watch.

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "CREATE TABLE products (id SERIAL PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT);"

    Now, insert a row:

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "INSERT INTO products (name, description, weight) VALUES ('Laptop', 'A powerful laptop', 1.5);"
  6. Observe the Change Event in Kafka

    Let's consume from the Kafka topic to see the change event that Debezium generated. The topic name is based on the `topic.prefix` and the table name (`dbserver1.public.products`).

    docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic dbserver1.public.products --from-beginning --max-messages 1 --property print.key=true

    You will see a detailed JSON payload representing the `INSERT` operation you just performed. Press Ctrl+C if you want to see more messages, or use --timeout-ms 10000 instead of --max-messages 1 to view all messages within a time window.

  7. Configure the JDBC Sink Connector

    Now we'll add a downstream sink to write these changes into another Postgres database (our "data warehouse"). First, create the destination table in the sink database:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "CREATE TABLE products (id INTEGER PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT);"

    Note: The sink table schema must match the source table columns that will be replicated. Alternatively, you can set auto.create=true in the connector config to let it create the table automatically, though manual creation gives you more control over data types and constraints.

    Create a file named register-sink.json with the JDBC Sink Connector configuration:

    {
      "name": "jdbc-sink-connector",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "dbserver1.public.products",
        "connection.url": "jdbc:postgresql://postgres-sink:5432/warehouse",
        "connection.user": "sink_user",
        "connection.password": "password",
        "auto.create": "false",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "table.name.format": "products",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
      }
    }

    Register the sink connector:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-sink.json

    Note: The sink connector uses Debezium's ExtractNewRecordState transform to unwrap the event envelope and extract just the after state, making it suitable for simple upserts.

    Production Tip: For production deployments, externalize credentials using environment variables or secure credential stores instead of hardcoding them in the connector configuration.

  8. Verify Data Replication

    Check that the data has been replicated to the sink database:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products;"

    You should see the same laptop record that was inserted in the source database. The JDBC sink connector consumed the change event from Kafka and inserted it into the destination table.

  9. Test the Upsert Pattern

    Let's update the product in the source database to see how the sink handles updates:

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "UPDATE products SET weight = 1.8, description = 'A powerful gaming laptop' WHERE id = 1;"

    Wait a moment for the change to propagate, then verify the sink reflects the update:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products WHERE id = 1;"

    The sink should show the updated values. Because we configured the sink with insert.mode=upsert, it performs an UPDATE if the record exists, or an INSERT if it doesn't—implementing a MERGE pattern automatically.

  10. Test Schema Evolution

    Let's add a new column to the source table and see how the pipeline handles schema changes:

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "ALTER TABLE products ADD COLUMN price DECIMAL(10,2);"

    Insert a new product with the price column:

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "INSERT INTO products (name, description, weight, price) VALUES ('Monitor', '27-inch 4K display', 5.5, 499.99);"

    The sink connector's auto.evolve=true setting will automatically add the new column to the sink table. Verify the schema evolution:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "\d products"

    Then check that both products are present:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products;"

    You should see the price column has been added, and both products are present—demonstrating seamless schema evolution!

  11. Validate End-to-End Consistency

    Finally, let's verify that the source and sink databases are in sync. First, check the source database:

    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "SELECT COUNT(*), MAX(id), MIN(id) FROM products;"
    docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c "SELECT * FROM products ORDER BY id;"

    Then check the sink database:

    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT COUNT(*), MAX(id), MIN(id) FROM products;"
    docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c "SELECT * FROM products ORDER BY id;"

    Both databases should show the same counts and data. Congratulations! You've built a complete end-to-end CDC pipeline with:

    • ✅ Real-time change capture from a source database
    • ✅ Event streaming through Kafka
    • ✅ Automatic materialization in a sink database
    • ✅ Upsert pattern handling for updates
    • ✅ Schema evolution support

Verify Your Sink

Beyond basic visual inspection, it's essential to validate that your CDC pipeline maintains perfect parity between source and sink. Here are SQL verification snippets that confirm end-to-end consistency:

1. Row Count Check

Verify that both databases have the same number of records:

-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
  "SELECT COUNT(*) as source_count FROM products;"

-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
  "SELECT COUNT(*) as sink_count FROM products;"

Expected output: Both queries should return the same count. For example:

 source_count 
--------------
            2
(1 row)

 sink_count 
------------
         2
(1 row)

If counts differ, investigate missing or duplicate records by comparing primary keys.

2. Primary Key Diff Check

Identify records that exist in one database but not the other:

-- Find IDs in source but missing from sink
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
  "COPY (SELECT id FROM products) TO STDOUT;" > /tmp/source_ids.txt

docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
  "COPY (SELECT id FROM products) TO STDOUT;" > /tmp/sink_ids.txt

# Compare the files
comm -23 <(sort /tmp/source_ids.txt) <(sort /tmp/sink_ids.txt)
# Output shows IDs in source but not in sink

comm -13 <(sort /tmp/source_ids.txt) <(sort /tmp/sink_ids.txt)
# Output shows IDs in sink but not in source

Expected output: Both commands should produce no output, indicating perfect parity:

# No missing records in either direction
(empty output)

Alternative SQL-only approach: If you have a shared database or can use foreign data wrappers:

-- This example assumes you can query both databases simultaneously
-- (requires dblink or similar cross-database query capability)

-- Records in source not in sink
SELECT s.id 
FROM source_schema.products s 
LEFT JOIN sink_schema.products k ON s.id = k.id 
WHERE k.id IS NULL;

-- Records in sink not in source
SELECT k.id 
FROM sink_schema.products k 
LEFT JOIN source_schema.products s ON k.id = s.id 
WHERE s.id IS NULL;

3. MD5 Hash Parity Check

Verify that record contents are identical by comparing hash values. This catches subtle data inconsistencies like truncation or type conversion errors:

-- Source database: compute hash of all column values
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
  "SELECT id, md5(CAST((id, name, description, weight, price) AS text)) as content_hash 
   FROM products 
   ORDER BY id;"

-- Sink database: compute same hash
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
  "SELECT id, md5(CAST((id, name, description, weight, price) AS text)) as content_hash 
   FROM products 
   ORDER BY id;"

Note: Adjust the column list (id, name, description, weight, price) to match your actual table schema. Include all columns that should be compared for consistency.

Expected output: Hash values should match for each ID:

 id |           content_hash           
----+----------------------------------
  1 | 5f7c3d8e9a2b1c4d6e8f0a1b2c3d4e5f
  2 | 8a9b0c1d2e3f4g5h6i7j8k9l0m1n2o3p
(2 rows)

Any hash mismatch indicates a data inconsistency that requires investigation.

4. Aggregate Consistency Check

Compare aggregate statistics to catch subtle data drift:

-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
  "SELECT 
     COUNT(*) as total_rows,
     COUNT(DISTINCT id) as unique_ids,
     SUM(weight) as total_weight,
     AVG(price) as avg_price,
     MIN(price) as min_price,
     MAX(price) as max_price
   FROM products;"

-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
  "SELECT 
     COUNT(*) as total_rows,
     COUNT(DISTINCT id) as unique_ids,
     SUM(weight) as total_weight,
     AVG(price) as avg_price,
     MIN(price) as min_price,
     MAX(price) as max_price
   FROM products;"

Expected output: All aggregates should match exactly:

 total_rows | unique_ids | total_weight |      avg_price      | min_price | max_price 
------------+------------+--------------+---------------------+-----------+-----------
          2 |          2 |          7.3 | 499.9900000000000000|    499.99 |    499.99
(1 row)

Discrepancies in aggregates often indicate missing records, duplicate inserts, or failed updates.

5. Null Value Consistency

Ensure NULL handling is consistent across source and sink:

-- Source database
docker-compose exec -u postgres postgres psql -U start_data_engineer -d inventory -c \
  "SELECT 
     COUNT(*) FILTER (WHERE price IS NULL) as null_prices,
     COUNT(*) FILTER (WHERE description IS NULL) as null_descriptions
   FROM products;"

-- Sink database
docker-compose exec -u postgres postgres-sink psql -U sink_user -d warehouse -c \
  "SELECT 
     COUNT(*) FILTER (WHERE price IS NULL) as null_prices,
     COUNT(*) FILTER (WHERE description IS NULL) as null_descriptions
   FROM products;"

Expected output: NULL counts should be identical:

 null_prices | null_descriptions 
-------------+-------------------
           1 |                 0
(1 row)

💡 Verification Best Practices

  • Automate checks: Run these queries in a monitoring script or CI/CD pipeline
  • Schedule regular validation: Don't wait for user complaints—proactively verify parity
  • Test with deletes: Ensure DELETE operations propagate correctly (requires additional sink configuration)
  • Monitor lag: Use Kafka consumer lag metrics to detect replication delays
  • Handle schema evolution: When columns are added or removed, update your verification queries

⚠️ Common Validation Pitfalls

  • Timestamp precision: Timestamps may lose microsecond precision depending on database and connector settings
  • Floating-point rounding: DECIMAL/NUMERIC types may round differently; use epsilon comparisons for floating-point values
  • Character encoding: Ensure both databases use the same encoding (UTF-8 recommended)
  • Time zones: Timestamps should be stored in UTC to avoid daylight saving time issues
  • Default values: Check if sink applies different defaults than source for NULL columns

Cleanup

When you're done experimenting, you can tear down all the services:

docker-compose down -v

The -v flag removes the volumes, ensuring a clean slate for next time.

Next Steps

Now that you understand the complete CDC flow, consider exploring:

Progress 0% No progress yet
Progress is stored locally in this browser.