Reconciliation & Offset Surgery
Master the advanced techniques every CDC operator needs: repair out-of-sync sinks with SQL diffs, verify data integrity with checksums, and safely reset offsets using the Kafka Connect REST API.
Reconciliation readiness tracker
Track your progress as you build expertise in reconciliation and offset surgery operations.
0 of 0 readiness checks complete (0%)
No readiness checklists available yet.
When reconciliation is necessary
Reconciliation becomes critical when your sink has drifted from the source. This happens in several scenarios:
- Missed events: Connector downtime longer than log retention, causing data gaps.
- Failed transformations: SMTs or sink logic that silently dropped or corrupted records.
- Partial writes: Sink acknowledged success but only persisted some rows due to transient errors.
- Manual interventions: Direct edits to sink data that bypassed the CDC pipeline.
- Split-brain scenarios: Multiple pipelines or manual processes writing to the same sink tables.
Detection first: Set up automated drift detection before attempting reconciliation. Monitor row counts, checksum mismatches, and lag spikes. Reconciliation is expensive—only run it when drift is confirmed.
Prerequisites
Before attempting reconciliation or offset surgery, ensure you have:
- Read-only access to source database for snapshot queries
- Write access to sink database for repairs (with approval and backup)
- Kafka Connect admin access for offset manipulation
- Backup of current offsets stored in version control or object storage
- Downtime window approved by stakeholders (or idempotent sink that tolerates replays)
- Runbook reviewed by at least one other engineer familiar with the pipeline
- Rollback plan documented with specific steps and decision points
Practice in staging first: Every reconciliation and offset surgery should be rehearsed in a non-production environment with realistic data volumes and latency profiles.
SQL diff patterns for source vs. sink
Compare source and sink using deterministic queries that highlight missing, extra, or mismatched rows.
Pattern 1: Row count comparison
Start with the simplest check—do both sides have the same number of rows?
-- Source (PostgreSQL example)
SELECT 'source' AS location, COUNT(*) AS row_count
FROM source_schema.customers;
-- Sink (destination database)
SELECT 'sink' AS location, COUNT(*) AS row_count
FROM sink_schema.customers;
Interpretation: If counts match, proceed to checksum verification. If counts differ, identify which rows are missing or extra using the patterns below.
Pattern 2: Find missing rows in sink
Identify primary keys present in source but absent from sink:
-- PostgreSQL to PostgreSQL example using foreign data wrapper
SELECT src.id, src.email, src.updated_at
FROM source_schema.customers src
LEFT JOIN sink_schema.customers snk ON src.id = snk.id
WHERE snk.id IS NULL
ORDER BY src.updated_at DESC
LIMIT 100;
Cross-database setup: The example above assumes PostgreSQL foreign data wrappers
(postgres_fdw) are configured. For other databases, export both datasets to CSV
or a neutral format, then use comm, diff, Python pandas, or SQL
import into a staging database for comparison.
Pattern 3: Find extra rows in sink
Detect rows that exist in sink but not in source (potential duplicates or orphaned records):
SELECT snk.id, snk.email, snk.created_at
FROM sink_schema.customers snk
LEFT JOIN source_schema.customers src ON snk.id = src.id
WHERE src.id IS NULL
ORDER BY snk.created_at DESC
LIMIT 100;
Pattern 4: Find mismatched columns
For rows that exist on both sides, detect column value differences:
-- Compare critical columns (adjust list for your schema)
SELECT
src.id,
CASE WHEN src.email != snk.email THEN 'email' END AS email_mismatch,
CASE WHEN src.status != snk.status THEN 'status' END AS status_mismatch,
CASE WHEN src.updated_at != snk.updated_at THEN 'updated_at' END AS ts_mismatch,
src.updated_at AS src_updated_at,
snk.updated_at AS snk_updated_at
FROM source_schema.customers src
INNER JOIN sink_schema.customers snk ON src.id = snk.id
WHERE src.email != snk.email
OR src.status != snk.status
OR src.updated_at != snk.updated_at
LIMIT 100;
Timestamp handling: CDC events include microsecond precision.
If your sink truncates to milliseconds or seconds, add ::timestamptz(3)
casts to both sides to avoid false positives.
Checksum verification methods
When row counts match but you suspect silent corruption, compute deterministic checksums over entire tables or partitions.
Method 1: Per-row checksums
Hash each row and compare aggregates:
-- PostgreSQL example using md5 over concatenated columns
SELECT
COUNT(*) AS row_count,
SUM(('x' || md5(id::text || email || COALESCE(status, '')))::bit(32)::bigint) AS checksum_sum
FROM source_schema.customers;
Run the same query on both source and sink. If row_count and checksum_sum match,
data is identical. If only row_count matches, use per-row hashes to find divergent rows.
Method 2: Partition-level checksums
For large tables, compute checksums per time window or partition key:
-- Daily partition checksums
SELECT
DATE_TRUNC('day', updated_at) AS partition_day,
COUNT(*) AS row_count,
SUM(('x' || md5(id::text || email || COALESCE(status, '')))::bit(32)::bigint) AS checksum_sum
FROM source_schema.customers
GROUP BY DATE_TRUNC('day', updated_at)
ORDER BY partition_day DESC;
Performance tip: Checksum queries are expensive. Run them during off-peak hours, or pre-compute checksums in a materialized view that updates incrementally.
Method 3: Third-party reconciliation tools
For production workloads, consider specialized tools:
- Datafold: Cloud-native data diffing with column-level reconciliation reports.
- dbt-audit-helper: Open-source dbt package for comparing datasets.
- AWS DMS Data Validation: Built-in validation for DMS replication tasks.
- Debezium signaling: Trigger incremental snapshots to re-sync specific tables.
Offset surgery: when and how
Offset surgery means directly manipulating the position markers that tell Kafka Connect "where to resume reading." This is dangerous—do it wrong and you'll duplicate or skip events.
Safe scenarios for offset surgery
- Rewind after failed transformation: Reset offsets to replay events through corrected SMTs.
- Skip poison pill: Advance offset past a single malformed message blocking progress.
- Resume from snapshot: After restoring sink from backup, align offsets with backup timestamp.
- Migrate connector: Copy offsets when moving a connector to a new cluster.
Unsafe scenarios (avoid or escalate)
- Editing offsets to hide data loss: If source logs expired, offsets can't fix it.
- Arbitrarily jumping to "now": Creates unbounded data gaps.
- Deleting offsets without understanding state: Forces full snapshot, which may overload source.
Golden rule: Never perform offset surgery without a pre-surgery offset backup and a documented rollback plan. If you can't restore the original state in under 5 minutes, don't start.
Kafka Connect REST API offset operations
Kafka Connect exposes a REST API for offset management. Below are the essential operations.
1. Backup current offsets
Always start by exporting the current offset state:
# List all connectors
curl -s http://localhost:8083/connectors | jq
# Get connector status and tasks
curl -s http://localhost:8083/connectors/my-postgres-source/status | jq
# Backup offsets from the internal Kafka topic
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--timeout-ms 10000 \
--property print.key=true \
> offsets-backup-$(date +%Y%m%d-%H%M%S).json
2. Pause connector
Stop processing before manipulating offsets:
curl -X PUT http://localhost:8083/connectors/my-postgres-source/pause
# Verify connector is paused
curl -s http://localhost:8083/connectors/my-postgres-source/status | jq '.connector.state'
3. Reset offsets (Kafka Connect 2.6+)
Delete stored offsets to force a fresh start:
# Stop and delete the connector
curl -X DELETE http://localhost:8083/connectors/my-postgres-source
# Recreate with initial.snapshot.mode=initial or schema_only
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @my-postgres-source-config.json
Warning: Deleting a connector removes its offsets permanently.
Ensure your connector config specifies the desired snapshot mode
(initial, schema_only, never).
4. Manually edit offsets (advanced)
For surgical precision, produce new offset records to the connect-offsets topic:
# Example: Rewind to a specific LSN for PostgreSQL
# First, inspect current offset structure
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--timeout-ms 5000 \
--property print.key=true \
| grep my-postgres-source
# Produce a new offset record (REPLACE placeholders with actual values)
echo '["",{"server":""}] {"lsn":,"txId":null,"ts_usec":}' \
| kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--property "parse.key=true" \
--property "key.separator= "
Extreme caution required: The offset format is connector-specific. MySQL uses binlog file + position, PostgreSQL uses LSN, SQL Server uses change LSN. Malformed offsets will cause the connector to fail on startup or silently skip/duplicate data.
5. Resume and verify
# Resume the connector
curl -X PUT http://localhost:8083/connectors/my-postgres-source/resume
# Watch the logs for offset confirmation
docker logs -f connect-container-name | grep -i offset
# Monitor lag and throughput
curl -s http://localhost:8083/connectors/my-postgres-source/status | jq
Rollback procedures
If reconciliation or offset surgery goes wrong, you need a rapid, deterministic path back to safety.
Rollback checklist
- Pause all affected connectors immediately. Use the REST API or stop the Connect workers.
- Restore offsets from pre-surgery backup. Produce the backup to
connect-offsetsor restore from database snapshot. - Verify offset restoration. Inspect
connect-offsetstopic to confirm keys and values match backup. - Roll back sink data if partially applied. If reconciliation wrote corrupted rows, restore from database backup or revert using SQL.
- Resume connectors and monitor closely. Watch lag, error logs, and DLQ for anomalies.
- Document the incident. Capture what failed, why, and what prevented early detection.
Example: Restore offsets from backup
# Pause connector
curl -X PUT http://localhost:8083/connectors/my-postgres-source/pause
# Produce backup offsets back to the topic
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--property "parse.key=true" \
--property "key.separator= " \
< offsets-backup-20251106-143022.json
# Verify restoration
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--timeout-ms 5000 \
--property print.key=true \
| grep my-postgres-source
# Resume connector
curl -X PUT http://localhost:8083/connectors/my-postgres-source/resume
Time-to-recovery target: Practice rollback drills quarterly. Your team should be able to restore offsets and resume normal operation within 15 minutes.
Warnings & guardrails
Reconciliation and offset surgery are high-risk operations. Respect these guardrails:
- Never run reconciliation in production during business hours without stakeholder approval and a rollback plan.
- Never delete offsets ad-hoc without understanding the snapshot mode and data volume implications.
- Always validate checksums before declaring reconciliation complete. Mismatched checksums mean silent corruption persists.
- Test rollback procedures in staging before attempting surgery in production.
- Document every offset edit in version control with timestamps, reasons, and approvers.
- Monitor downstream consumers during and after surgery. Offset rewinds can cause duplicate processing.
- Coordinate with database administrators before running expensive diff queries on production databases.
Pre-flight checklist
Before starting any offset surgery or reconciliation, confirm:
- Offsets are backed up and validated
- Downstream consumers can tolerate replays or are paused
- Monitoring dashboards are live and alerting is enabled
- At least one other engineer reviewed the runbook
- Rollback plan is documented with specific commands and decision points
- Stakeholders are notified of the maintenance window
Automation scripts
Codify common offset surgery patterns into scripts to reduce human error.
Script: Safe offset backup and restore
#!/usr/bin/env bash
set -euo pipefail
CONNECTOR_NAME="${1:?Connector name required}"
ACTION="${2:?Action required: backup or restore}"
BACKUP_FILE="offsets-${CONNECTOR_NAME}-$(date +%Y%m%d-%H%M%S-%N).json"
KAFKA_BOOTSTRAP="localhost:9092"
CONNECT_URL="http://localhost:8083"
backup_offsets() {
echo "Backing up offsets for $CONNECTOR_NAME to $BACKUP_FILE"
# grep returns exit code 1 if no matches, handle gracefully
kafka-console-consumer \
--bootstrap-server "$KAFKA_BOOTSTRAP" \
--topic connect-offsets \
--from-beginning \
--timeout-ms 10000 \
--property print.key=true \
| grep "$CONNECTOR_NAME" > "$BACKUP_FILE" || true
if [ -s "$BACKUP_FILE" ]; then
echo "Backup saved to $BACKUP_FILE ($(wc -l < "$BACKUP_FILE") lines)"
else
echo "Warning: No offsets found for connector $CONNECTOR_NAME"
rm -f "$BACKUP_FILE"
exit 1
fi
}
restore_offsets() {
local restore_file="${1:?Backup file required for restore}"
echo "Pausing connector $CONNECTOR_NAME"
curl -s -X PUT "$CONNECT_URL/connectors/$CONNECTOR_NAME/pause"
sleep 3
echo "Restoring offsets from $restore_file"
kafka-console-producer \
--bootstrap-server "$KAFKA_BOOTSTRAP" \
--topic connect-offsets \
--property "parse.key=true" \
--property "key.separator= " \
< "$restore_file"
echo "Resuming connector $CONNECTOR_NAME"
curl -s -X PUT "$CONNECT_URL/connectors/$CONNECTOR_NAME/resume"
echo "Offset restoration complete. Monitor connector status."
}
case "$ACTION" in
backup)
backup_offsets
;;
restore)
shift 2 # Remove CONNECTOR_NAME and ACTION from args
restore_offsets "$@"
;;
*)
echo "Unknown action: $ACTION. Use 'backup' or 'restore'."
exit 1
;;
esac
Script: Row count reconciliation report
#!/usr/bin/env bash
# Generate row count comparison report for source and sink
# Requires: psql, jq
SOURCE_CONN="postgresql://user:pass@source-db:5432/mydb"
SINK_CONN="postgresql://user:pass@sink-db:5432/mydb"
SCHEMA="${1:-public}"
TABLES="${2:-customers,orders,products}"
echo "Reconciliation Report - $(date)"
echo "Schema: $SCHEMA"
echo "Tables: $TABLES"
echo "================================"
IFS=',' read -ra TABLE_ARRAY <<< "$TABLES"
for table in "${TABLE_ARRAY[@]}"; do
src_count=$(psql "$SOURCE_CONN" -t -c "SELECT COUNT(*) FROM $SCHEMA.$table")
snk_count=$(psql "$SINK_CONN" -t -c "SELECT COUNT(*) FROM $SCHEMA.$table")
diff=$((src_count - snk_count))
status="✓ OK"
[ "$diff" -ne 0 ] && status="✗ MISMATCH"
printf "%-20s | Source: %10d | Sink: %10d | Diff: %+6d | %s\n" \
"$table" "$src_count" "$snk_count" "$diff" "$status"
done
Reconciliation readiness scorecard
Assess your team's preparedness for reconciliation and offset surgery operations.
| Area | Ready when… | If not, next step |
|---|---|---|
| Automated checksum validation runs daily, with alerts on mismatches exceeding threshold. | Implement scheduled checksum queries, store results in a metrics database, and configure alerts. | |
| Offsets are backed up hourly, restore drills pass quarterly, and recovery completes within 15 minutes. | Automate offset backups, document restore procedures, and schedule next drill with SRE team. | |
| SQL diff patterns documented, tested in staging, and include rollback decision trees. | Write runbooks for each connector type, validate with sample data, and review with database team. | |
| Offset changes require two-person approval, pre-flight checklist, and post-surgery monitoring. | Define approval policy, create checklist template, and configure monitoring dashboards. | |
| Database team, SRE, and data platform teams participate in reconciliation drills and review runbooks. | Schedule kickoff meeting, assign roles, and establish communication channels for incidents. |
All capabilities are ready. Toggle to see everything or reset to start over.
Reconciliation & Offset Surgery Knowledge Check
Test your understanding of data reconciliation and safe offset management.
What is data reconciliation in the context of CDC?
Reconciliation is the process of comparing the source database with downstream sinks to identify and fix data discrepancies—missing records, incorrect values, or out-of-order updates. It's a safety net that catches issues missed by the CDC pipeline due to failures, bugs, or operational errors.
Review the correct answer and explanation.
Why might you need to perform 'offset surgery' in a CDC pipeline?
Offset surgery involves manually resetting consumer offsets to recover from failures (e.g., sink lost data, need to replay), skip poisonous messages that cause crashes, or reprocess a specific time range. It's a powerful but risky operation that requires careful planning and validation to avoid data corruption.
Review the correct answer and explanation.
What is a SQL diff pattern in CDC reconciliation?
A SQL diff pattern compares datasets between source and sink using queries (e.g., EXCEPT, MINUS, or hash-based comparisons). This identifies rows that exist in one system but not the other, or rows with mismatched values. It's commonly used in audit loops to detect and repair CDC-related data drift.
Review the correct answer and explanation.
What is checksum verification, and why is it useful in CDC?
Checksum verification involves computing hash values (MD5, SHA, etc.) on rows or datasets in both source and sink. Matching checksums indicate identical data; mismatches reveal discrepancies. This is much faster than full data comparison, enabling efficient reconciliation of large tables without transferring entire datasets.
Review the correct answer and explanation.
How does the Kafka Connect REST API help with offset management?
The Kafka Connect REST API provides endpoints to inspect connector status, view current offsets, and even modify offsets (in newer versions). This enables automated or manual offset surgery—resetting a connector to replay from a specific LSN/SCN or skipping problematic log positions—without restarting the entire cluster.
Review the correct answer and explanation.
Further resources
- Ops: Offsets & Replays — Foundational offset management patterns
- Observability Basics — Monitoring dashboards for drift detection
- Failure Scenario Drills — Practice recovery procedures in safe environments
- Materialization 101 — Idempotent upsert patterns that tolerate replays
- Debezium Signaling — Trigger incremental snapshots for reconciliation
- Kafka Connect REST API — Official documentation for connector management