shared assumptions (adapt to your schema)

if your source doesn’t emit OP_TS, use commit LSN/GTID/SCN as the ordering surrogate.

snowflake

MERGE INTO TARGET_CUSTOMERS t
USING (
  SELECT ID, EMAIL, OP, OP_TS FROM STG_CUSTOMERS
  QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
  DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS THEN
  UPDATE SET EMAIL = s.EMAIL, OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP <> 'd' THEN
  INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);

dedup in the USING subquery protects against duplicate events in staging.

bigquery

MERGE `dataset.TARGET_CUSTOMERS` t
USING (
  SELECT ID, EMAIL, OP, OP_TS FROM `dataset.STG_CUSTOMERS`
  QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
  DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS THEN
  UPDATE SET EMAIL = s.EMAIL, OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP != 'd' THEN
  INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);

consider partitioning TARGET_CUSTOMERS by DATE(OP_TS) and clustering by ID for merge performance.

databricks delta

MERGE INTO target_customers AS t
USING (
  SELECT ID, EMAIL, OP, OP_TS
  FROM   stg_customers
  QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) AS s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS
  THEN UPDATE SET t.EMAIL = s.EMAIL, t.OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP <> 'd'
  THEN INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);

enable OPTIMIZE + ZORDER BY (ID) on large tables to keep merge fast.

postgres

-- ensure pk
ALTER TABLE target_customers ADD PRIMARY KEY (id);

-- idempotent upsert with latest-wins
INSERT INTO target_customers (id, email, op_ts)
SELECT id, email, op_ts
FROM (
  SELECT id, email, op, op_ts,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) AS rn
  FROM stg_customers
) s
WHERE s.rn = 1 AND s.op <> 'd'
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
    op_ts = GREATEST(target_customers.op_ts, EXCLUDED.op_ts);

-- deletes
DELETE FROM target_customers t
USING (
  SELECT id, MAX(op_ts) AS op_ts
  FROM stg_customers WHERE op = 'd'
  GROUP BY id
) d
WHERE t.id = d.id AND d.op_ts >= t.op_ts;

two-step approach (upsert then delete) is simple and fast; wrap in a transaction.

mysql

-- latest non-delete per id into a temp table
CREATE TEMPORARY TABLE tmp_latest AS
SELECT id, email, op, op_ts
FROM (
  SELECT id, email, op, op_ts,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) rn
  FROM stg_customers
) x WHERE rn = 1;

-- upsert
INSERT INTO target_customers (id, email, op_ts)
SELECT id, email, op_ts FROM tmp_latest WHERE op <> 'd'
ON DUPLICATE KEY UPDATE
  email = VALUES(email),
  op_ts = GREATEST(target_customers.op_ts, VALUES(op_ts));

-- delete
DELETE t FROM target_customers t
JOIN tmp_latest d ON d.id = t.id AND d.op = 'd' AND d.op_ts >= t.op_ts;

ensure an index/PK on target_customers(id). MySQL 8+ window functions simplify the dedupe step.

redshift

-- staging dedupe (late-events safe)
CREATE TEMP TABLE stg_dedup DISTKEY(id) SORTKEY(id) AS
SELECT id, email, op, op_ts
FROM (
  SELECT id, email, op, op_ts,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) rn
  FROM stg_customers
) s WHERE rn = 1;

-- deletes first (to avoid extra writes)
DELETE FROM target_customers t
USING stg_dedup d
WHERE d.op = 'd' AND t.id = d.id AND d.op_ts >= t.op_ts;

-- upsert via MERGE (supported)
MERGE INTO target_customers t
USING stg_dedup s
ON t.id = s.id
WHEN MATCHED AND s.op <> 'd' AND s.op_ts >= t.op_ts THEN
  UPDATE SET email = s.email, op_ts = s.op_ts
WHEN NOT MATCHED AND s.op <> 'd' THEN
  INSERT (id, email, op_ts) VALUES (s.id, s.email, s.op_ts);

consider VACUUM / ANALYZE schedules on heavy churn tables.

handling hard parts

1) replays after a crash

-- safe replay: re-run the last N minutes of staging
DELETE FROM target_customers
WHERE (id, op_ts) IN (
  SELECT id, op_ts FROM stg_customers WHERE op_ts >= NOW() - INTERVAL '10 minutes'
);
-- re-run the regular upsert + delete logic

2) keyless tables

avoid “replica identity full” targets. pick a **natural business key** or synthesize one (hash of stable columns). worst case, land to a history table only.

3) soft deletes vs hard deletes

-- soft delete variant (Snowflake example)
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
  UPDATE SET IS_DELETED = TRUE, OP_TS = s.OP_TS

4) schema evolution

additive columns are easiest: default target to NULL and include them in the UPDATE SET. for type changes, land to a compatible staging column and cast during merge.

acceptance checks (copy/paste)

-- duplicates: expect 0 difference
SELECT COUNT(*) AS rows, COUNT(DISTINCT id) AS keys FROM target_customers;

-- latest-wins: target should reflect max OP_TS per id (expect 0 stale)
WITH last AS (SELECT id, MAX(op_ts) AS last_ts FROM stg_customers GROUP BY id)
SELECT COUNT(*) FROM target_customers t JOIN last l USING(id)
WHERE t.op_ts < l.last_ts;

-- delete sanity (expect 0)
SELECT COUNT(*) FROM target_customers t
JOIN (
  SELECT id, MAX(op_ts) op_ts FROM stg_customers WHERE op='d' GROUP BY id
) d USING(id)
WHERE t.op_ts < d.op_ts;