Advanced

Reconciliation & Offset Surgery

Master the advanced techniques every CDC operator needs: repair out-of-sync sinks with SQL diffs, verify data integrity with checksums, and safely reset offsets using the Kafka Connect REST API.

Reconciliation readiness tracker

Track your progress as you build expertise in reconciliation and offset surgery operations.

0 of 0 readiness checks complete (0%)

    When reconciliation is necessary

    Reconciliation becomes critical when your sink has drifted from the source. This happens in several scenarios:

    • Missed events: Connector downtime longer than log retention, causing data gaps.
    • Failed transformations: SMTs or sink logic that silently dropped or corrupted records.
    • Partial writes: Sink acknowledged success but only persisted some rows due to transient errors.
    • Manual interventions: Direct edits to sink data that bypassed the CDC pipeline.
    • Split-brain scenarios: Multiple pipelines or manual processes writing to the same sink tables.

    Detection first: Set up automated drift detection before attempting reconciliation. Monitor row counts, checksum mismatches, and lag spikes. Reconciliation is expensive—only run it when drift is confirmed.

    Prerequisites

    Before attempting reconciliation or offset surgery, ensure you have:

    • Read-only access to source database for snapshot queries
    • Write access to sink database for repairs (with approval and backup)
    • Kafka Connect admin access for offset manipulation
    • Backup of current offsets stored in version control or object storage
    • Downtime window approved by stakeholders (or idempotent sink that tolerates replays)
    • Runbook reviewed by at least one other engineer familiar with the pipeline
    • Rollback plan documented with specific steps and decision points

    Practice in staging first: Every reconciliation and offset surgery should be rehearsed in a non-production environment with realistic data volumes and latency profiles.

    SQL diff patterns for source vs. sink

    Compare source and sink using deterministic queries that highlight missing, extra, or mismatched rows.

    Pattern 1: Row count comparison

    Start with the simplest check—do both sides have the same number of rows?

    -- Source (PostgreSQL example)
    SELECT 'source' AS location, COUNT(*) AS row_count
    FROM source_schema.customers;
    
    -- Sink (destination database)
    SELECT 'sink' AS location, COUNT(*) AS row_count
    FROM sink_schema.customers;

    Interpretation: If counts match, proceed to checksum verification. If counts differ, identify which rows are missing or extra using the patterns below.

    Pattern 2: Find missing rows in sink

    Identify primary keys present in source but absent from sink:

    -- PostgreSQL to PostgreSQL example using foreign data wrapper
    SELECT src.id, src.email, src.updated_at
    FROM source_schema.customers src
    LEFT JOIN sink_schema.customers snk ON src.id = snk.id
    WHERE snk.id IS NULL
    ORDER BY src.updated_at DESC
    LIMIT 100;

    Cross-database setup: The example above assumes PostgreSQL foreign data wrappers (postgres_fdw) are configured. For other databases, export both datasets to CSV or a neutral format, then use comm, diff, Python pandas, or SQL import into a staging database for comparison.

    Pattern 3: Find extra rows in sink

    Detect rows that exist in sink but not in source (potential duplicates or orphaned records):

    SELECT snk.id, snk.email, snk.created_at
    FROM sink_schema.customers snk
    LEFT JOIN source_schema.customers src ON snk.id = src.id
    WHERE src.id IS NULL
    ORDER BY snk.created_at DESC
    LIMIT 100;

    Pattern 4: Find mismatched columns

    For rows that exist on both sides, detect column value differences:

    -- Compare critical columns (adjust list for your schema)
    SELECT 
      src.id,
      CASE WHEN src.email != snk.email THEN 'email' END AS email_mismatch,
      CASE WHEN src.status != snk.status THEN 'status' END AS status_mismatch,
      CASE WHEN src.updated_at != snk.updated_at THEN 'updated_at' END AS ts_mismatch,
      src.updated_at AS src_updated_at,
      snk.updated_at AS snk_updated_at
    FROM source_schema.customers src
    INNER JOIN sink_schema.customers snk ON src.id = snk.id
    WHERE src.email != snk.email
       OR src.status != snk.status
       OR src.updated_at != snk.updated_at
    LIMIT 100;

    Timestamp handling: CDC events include microsecond precision. If your sink truncates to milliseconds or seconds, add ::timestamptz(3) casts to both sides to avoid false positives.

    Checksum verification methods

    When row counts match but you suspect silent corruption, compute deterministic checksums over entire tables or partitions.

    Method 1: Per-row checksums

    Hash each row and compare aggregates:

    -- PostgreSQL example using md5 over concatenated columns
    SELECT 
      COUNT(*) AS row_count,
      SUM(('x' || md5(id::text || email || COALESCE(status, '')))::bit(32)::bigint) AS checksum_sum
    FROM source_schema.customers;

    Run the same query on both source and sink. If row_count and checksum_sum match, data is identical. If only row_count matches, use per-row hashes to find divergent rows.

    Method 2: Partition-level checksums

    For large tables, compute checksums per time window or partition key:

    -- Daily partition checksums
    SELECT 
      DATE_TRUNC('day', updated_at) AS partition_day,
      COUNT(*) AS row_count,
      SUM(('x' || md5(id::text || email || COALESCE(status, '')))::bit(32)::bigint) AS checksum_sum
    FROM source_schema.customers
    GROUP BY DATE_TRUNC('day', updated_at)
    ORDER BY partition_day DESC;

    Performance tip: Checksum queries are expensive. Run them during off-peak hours, or pre-compute checksums in a materialized view that updates incrementally.

    Method 3: Third-party reconciliation tools

    For production workloads, consider specialized tools:

    • Datafold: Cloud-native data diffing with column-level reconciliation reports.
    • dbt-audit-helper: Open-source dbt package for comparing datasets.
    • AWS DMS Data Validation: Built-in validation for DMS replication tasks.
    • Debezium signaling: Trigger incremental snapshots to re-sync specific tables.

    Offset surgery: when and how

    Offset surgery means directly manipulating the position markers that tell Kafka Connect "where to resume reading." This is dangerous—do it wrong and you'll duplicate or skip events.

    Safe scenarios for offset surgery

    • Rewind after failed transformation: Reset offsets to replay events through corrected SMTs.
    • Skip poison pill: Advance offset past a single malformed message blocking progress.
    • Resume from snapshot: After restoring sink from backup, align offsets with backup timestamp.
    • Migrate connector: Copy offsets when moving a connector to a new cluster.

    Unsafe scenarios (avoid or escalate)

    • Editing offsets to hide data loss: If source logs expired, offsets can't fix it.
    • Arbitrarily jumping to "now": Creates unbounded data gaps.
    • Deleting offsets without understanding state: Forces full snapshot, which may overload source.

    Golden rule: Never perform offset surgery without a pre-surgery offset backup and a documented rollback plan. If you can't restore the original state in under 5 minutes, don't start.

    Kafka Connect REST API offset operations

    Kafka Connect exposes a REST API for offset management. Below are the essential operations.

    1. Backup current offsets

    Always start by exporting the current offset state:

    # List all connectors
    curl -s http://localhost:8083/connectors | jq
    
    # Get connector status and tasks
    curl -s http://localhost:8083/connectors/my-postgres-source/status | jq
    
    # Backup offsets from the internal Kafka topic
    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic connect-offsets \
      --from-beginning \
      --timeout-ms 10000 \
      --property print.key=true \
      > offsets-backup-$(date +%Y%m%d-%H%M%S).json

    2. Pause connector

    Stop processing before manipulating offsets:

    curl -X PUT http://localhost:8083/connectors/my-postgres-source/pause
    
    # Verify connector is paused
    curl -s http://localhost:8083/connectors/my-postgres-source/status | jq '.connector.state'

    3. Reset offsets (Kafka Connect 2.6+)

    Delete stored offsets to force a fresh start:

    # Stop and delete the connector
    curl -X DELETE http://localhost:8083/connectors/my-postgres-source
    
    # Recreate with initial.snapshot.mode=initial or schema_only
    curl -X POST http://localhost:8083/connectors \
      -H "Content-Type: application/json" \
      -d @my-postgres-source-config.json

    Warning: Deleting a connector removes its offsets permanently. Ensure your connector config specifies the desired snapshot mode (initial, schema_only, never).

    4. Manually edit offsets (advanced)

    For surgical precision, produce new offset records to the connect-offsets topic:

    # Example: Rewind to a specific LSN for PostgreSQL
    # First, inspect current offset structure
    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic connect-offsets \
      --from-beginning \
      --timeout-ms 5000 \
      --property print.key=true \
      | grep my-postgres-source
    
    # Produce a new offset record (REPLACE placeholders with actual values)
    echo '["",{"server":""}]	{"lsn":,"txId":null,"ts_usec":}' \
      | kafka-console-producer \
        --bootstrap-server localhost:9092 \
        --topic connect-offsets \
        --property "parse.key=true" \
        --property "key.separator=	"

    Extreme caution required: The offset format is connector-specific. MySQL uses binlog file + position, PostgreSQL uses LSN, SQL Server uses change LSN. Malformed offsets will cause the connector to fail on startup or silently skip/duplicate data.

    5. Resume and verify

    # Resume the connector
    curl -X PUT http://localhost:8083/connectors/my-postgres-source/resume
    
    # Watch the logs for offset confirmation
    docker logs -f connect-container-name | grep -i offset
    
    # Monitor lag and throughput
    curl -s http://localhost:8083/connectors/my-postgres-source/status | jq

    Rollback procedures

    If reconciliation or offset surgery goes wrong, you need a rapid, deterministic path back to safety.

    Rollback checklist

    1. Pause all affected connectors immediately. Use the REST API or stop the Connect workers.
    2. Restore offsets from pre-surgery backup. Produce the backup to connect-offsets or restore from database snapshot.
    3. Verify offset restoration. Inspect connect-offsets topic to confirm keys and values match backup.
    4. Roll back sink data if partially applied. If reconciliation wrote corrupted rows, restore from database backup or revert using SQL.
    5. Resume connectors and monitor closely. Watch lag, error logs, and DLQ for anomalies.
    6. Document the incident. Capture what failed, why, and what prevented early detection.

    Example: Restore offsets from backup

    # Pause connector
    curl -X PUT http://localhost:8083/connectors/my-postgres-source/pause
    
    # Produce backup offsets back to the topic
    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic connect-offsets \
      --property "parse.key=true" \
      --property "key.separator=	" \
      < offsets-backup-20251106-143022.json
    
    # Verify restoration
    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic connect-offsets \
      --from-beginning \
      --timeout-ms 5000 \
      --property print.key=true \
      | grep my-postgres-source
    
    # Resume connector
    curl -X PUT http://localhost:8083/connectors/my-postgres-source/resume

    Time-to-recovery target: Practice rollback drills quarterly. Your team should be able to restore offsets and resume normal operation within 15 minutes.

    Warnings & guardrails

    Reconciliation and offset surgery are high-risk operations. Respect these guardrails:

    • Never run reconciliation in production during business hours without stakeholder approval and a rollback plan.
    • Never delete offsets ad-hoc without understanding the snapshot mode and data volume implications.
    • Always validate checksums before declaring reconciliation complete. Mismatched checksums mean silent corruption persists.
    • Test rollback procedures in staging before attempting surgery in production.
    • Document every offset edit in version control with timestamps, reasons, and approvers.
    • Monitor downstream consumers during and after surgery. Offset rewinds can cause duplicate processing.
    • Coordinate with database administrators before running expensive diff queries on production databases.

    Pre-flight checklist

    Before starting any offset surgery or reconciliation, confirm:

    • Offsets are backed up and validated
    • Downstream consumers can tolerate replays or are paused
    • Monitoring dashboards are live and alerting is enabled
    • At least one other engineer reviewed the runbook
    • Rollback plan is documented with specific commands and decision points
    • Stakeholders are notified of the maintenance window

    Automation scripts

    Codify common offset surgery patterns into scripts to reduce human error.

    Script: Safe offset backup and restore

    #!/usr/bin/env bash
    set -euo pipefail
    
    CONNECTOR_NAME="${1:?Connector name required}"
    ACTION="${2:?Action required: backup or restore}"
    BACKUP_FILE="offsets-${CONNECTOR_NAME}-$(date +%Y%m%d-%H%M%S-%N).json"
    KAFKA_BOOTSTRAP="localhost:9092"
    CONNECT_URL="http://localhost:8083"
    
    backup_offsets() {
      echo "Backing up offsets for $CONNECTOR_NAME to $BACKUP_FILE"
      # grep returns exit code 1 if no matches, handle gracefully
      kafka-console-consumer \
        --bootstrap-server "$KAFKA_BOOTSTRAP" \
        --topic connect-offsets \
        --from-beginning \
        --timeout-ms 10000 \
        --property print.key=true \
        | grep "$CONNECTOR_NAME" > "$BACKUP_FILE" || true
      
      if [ -s "$BACKUP_FILE" ]; then
        echo "Backup saved to $BACKUP_FILE ($(wc -l < "$BACKUP_FILE") lines)"
      else
        echo "Warning: No offsets found for connector $CONNECTOR_NAME"
        rm -f "$BACKUP_FILE"
        exit 1
      fi
    }
    
    restore_offsets() {
      local restore_file="${1:?Backup file required for restore}"
      
      echo "Pausing connector $CONNECTOR_NAME"
      curl -s -X PUT "$CONNECT_URL/connectors/$CONNECTOR_NAME/pause"
      sleep 3
      
      echo "Restoring offsets from $restore_file"
      kafka-console-producer \
        --bootstrap-server "$KAFKA_BOOTSTRAP" \
        --topic connect-offsets \
        --property "parse.key=true" \
        --property "key.separator=	" \
        < "$restore_file"
      
      echo "Resuming connector $CONNECTOR_NAME"
      curl -s -X PUT "$CONNECT_URL/connectors/$CONNECTOR_NAME/resume"
      
      echo "Offset restoration complete. Monitor connector status."
    }
    
    case "$ACTION" in
      backup)
        backup_offsets
        ;;
      restore)
        shift 2  # Remove CONNECTOR_NAME and ACTION from args
        restore_offsets "$@"
        ;;
      *)
        echo "Unknown action: $ACTION. Use 'backup' or 'restore'."
        exit 1
        ;;
    esac

    Script: Row count reconciliation report

    #!/usr/bin/env bash
    # Generate row count comparison report for source and sink
    # Requires: psql, jq
    
    SOURCE_CONN="postgresql://user:pass@source-db:5432/mydb"
    SINK_CONN="postgresql://user:pass@sink-db:5432/mydb"
    SCHEMA="${1:-public}"
    TABLES="${2:-customers,orders,products}"
    
    echo "Reconciliation Report - $(date)"
    echo "Schema: $SCHEMA"
    echo "Tables: $TABLES"
    echo "================================"
    
    IFS=',' read -ra TABLE_ARRAY <<< "$TABLES"
    for table in "${TABLE_ARRAY[@]}"; do
      src_count=$(psql "$SOURCE_CONN" -t -c "SELECT COUNT(*) FROM $SCHEMA.$table")
      snk_count=$(psql "$SINK_CONN" -t -c "SELECT COUNT(*) FROM $SCHEMA.$table")
      
      diff=$((src_count - snk_count))
      status="✓ OK"
      [ "$diff" -ne 0 ] && status="✗ MISMATCH"
      
      printf "%-20s | Source: %10d | Sink: %10d | Diff: %+6d | %s\n" \
        "$table" "$src_count" "$snk_count" "$diff" "$status"
    done

    Reconciliation readiness scorecard

    Assess your team's preparedness for reconciliation and offset surgery operations.

    0 of 5 ready (0%)

    Reconciliation operational maturity
    Area Ready when… If not, next step
    Automated checksum validation runs daily, with alerts on mismatches exceeding threshold. Implement scheduled checksum queries, store results in a metrics database, and configure alerts.
    Offsets are backed up hourly, restore drills pass quarterly, and recovery completes within 15 minutes. Automate offset backups, document restore procedures, and schedule next drill with SRE team.
    SQL diff patterns documented, tested in staging, and include rollback decision trees. Write runbooks for each connector type, validate with sample data, and review with database team.
    Offset changes require two-person approval, pre-flight checklist, and post-surgery monitoring. Define approval policy, create checklist template, and configure monitoring dashboards.
    Database team, SRE, and data platform teams participate in reconciliation drills and review runbooks. Schedule kickoff meeting, assign roles, and establish communication channels for incidents.

    Reconciliation & Offset Surgery Knowledge Check

    Test your understanding of data reconciliation and safe offset management.

    Q1

    What is data reconciliation in the context of CDC?

    Q2

    Why might you need to perform 'offset surgery' in a CDC pipeline?

    Q3

    What is a SQL diff pattern in CDC reconciliation?

    Q4

    What is checksum verification, and why is it useful in CDC?

    Q5

    How does the Kafka Connect REST API help with offset management?

    0/5 correct

    Further resources

    Progress 0% No progress yet
    Progress is stored locally in this browser.