snowflake
MERGE INTO TARGET_CUSTOMERS t
USING (
SELECT ID, EMAIL, OP, OP_TS FROM STG_CUSTOMERS
QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS THEN
UPDATE SET EMAIL = s.EMAIL, OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP <> 'd' THEN
INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);
dedup in the USING subquery protects against duplicate events in staging.
bigquery
MERGE `dataset.TARGET_CUSTOMERS` t
USING (
SELECT ID, EMAIL, OP, OP_TS FROM `dataset.STG_CUSTOMERS`
QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS THEN
UPDATE SET EMAIL = s.EMAIL, OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP != 'd' THEN
INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);
consider partitioning TARGET_CUSTOMERS by DATE(OP_TS) and clustering by ID for merge performance.
databricks delta
MERGE INTO target_customers AS t
USING (
SELECT ID, EMAIL, OP, OP_TS
FROM stg_customers
QUALIFY ROW_NUMBER() OVER (PARTITION BY ID ORDER BY OP_TS DESC) = 1
) AS s
ON t.ID = s.ID
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN DELETE
WHEN MATCHED AND s.OP IN ('c','u') AND s.OP_TS >= t.OP_TS
THEN UPDATE SET t.EMAIL = s.EMAIL, t.OP_TS = s.OP_TS
WHEN NOT MATCHED AND s.OP <> 'd'
THEN INSERT (ID, EMAIL, OP_TS) VALUES (s.ID, s.EMAIL, s.OP_TS);
enable OPTIMIZE + ZORDER BY (ID) on large tables to keep merge fast.
postgres
-- ensure pk
ALTER TABLE target_customers ADD PRIMARY KEY (id);
-- idempotent upsert with latest-wins
INSERT INTO target_customers (id, email, op_ts)
SELECT id, email, op_ts
FROM (
SELECT id, email, op, op_ts,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) AS rn
FROM stg_customers
) s
WHERE s.rn = 1 AND s.op <> 'd'
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
op_ts = GREATEST(target_customers.op_ts, EXCLUDED.op_ts);
-- deletes
DELETE FROM target_customers t
USING (
SELECT id, MAX(op_ts) AS op_ts
FROM stg_customers WHERE op = 'd'
GROUP BY id
) d
WHERE t.id = d.id AND d.op_ts >= t.op_ts;
two-step approach (upsert then delete) is simple and fast; wrap in a transaction.
mysql
-- latest non-delete per id into a temp table
CREATE TEMPORARY TABLE tmp_latest AS
SELECT id, email, op, op_ts
FROM (
SELECT id, email, op, op_ts,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) rn
FROM stg_customers
) x WHERE rn = 1;
-- upsert
INSERT INTO target_customers (id, email, op_ts)
SELECT id, email, op_ts FROM tmp_latest WHERE op <> 'd'
ON DUPLICATE KEY UPDATE
email = VALUES(email),
op_ts = GREATEST(target_customers.op_ts, VALUES(op_ts));
-- delete
DELETE t FROM target_customers t
JOIN tmp_latest d ON d.id = t.id AND d.op = 'd' AND d.op_ts >= t.op_ts;
ensure an index/PK on target_customers(id). MySQL 8+ window functions simplify the dedupe step.
redshift
-- staging dedupe (late-events safe)
CREATE TEMP TABLE stg_dedup DISTKEY(id) SORTKEY(id) AS
SELECT id, email, op, op_ts
FROM (
SELECT id, email, op, op_ts,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY op_ts DESC) rn
FROM stg_customers
) s WHERE rn = 1;
-- deletes first (to avoid extra writes)
DELETE FROM target_customers t
USING stg_dedup d
WHERE d.op = 'd' AND t.id = d.id AND d.op_ts >= t.op_ts;
-- upsert via MERGE (supported)
MERGE INTO target_customers t
USING stg_dedup s
ON t.id = s.id
WHEN MATCHED AND s.op <> 'd' AND s.op_ts >= t.op_ts THEN
UPDATE SET email = s.email, op_ts = s.op_ts
WHEN NOT MATCHED AND s.op <> 'd' THEN
INSERT (id, email, op_ts) VALUES (s.id, s.email, s.op_ts);
consider VACUUM / ANALYZE schedules on heavy churn tables.
handling hard parts
1) replays after a crash
-- safe replay: re-run the last N minutes of staging
DELETE FROM target_customers
WHERE (id, op_ts) IN (
SELECT id, op_ts FROM stg_customers WHERE op_ts >= NOW() - INTERVAL '10 minutes'
);
-- re-run the regular upsert + delete logic
2) keyless tables
avoid “replica identity full” targets. pick a **natural business key** or synthesize one (hash of stable columns). worst case, land to a history table only.
3) soft deletes vs hard deletes
-- soft delete variant (Snowflake example)
WHEN MATCHED AND s.OP = 'd' AND s.OP_TS >= t.OP_TS THEN
UPDATE SET IS_DELETED = TRUE, OP_TS = s.OP_TS
4) schema evolution
additive columns are easiest: default target to NULL and include them in the UPDATE SET. for type changes, land to a compatible staging column and cast during merge.