Skip to content

MERGE INTO silently drops unmatched target rows when target is unsorted by the ON-key #128

@rampage644

Description

@rampage644

Summary

MERGE INTO ... WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT silently drops a non-deterministic fraction of the target's unmatched rows when the target table is not physically ordered by the join (ON) key. The rows are not deleted via a DELETE action — they simply don't appear in the new snapshot after the COW overwrite. The MERGE result counters (number of rows updated, number of rows inserted) report correct values, so the loss is invisible to the client unless the row count is checked afterwards.

This affects any incremental dbt model that uses MERGE against a target whose physical layout isn't aligned with its unique_key — including the entire dbt-snowplow-web package's incremental manifests.

Environment

  • Embucket Lambda build, current main (this session was running the lambda zip from /Users/ramp/vcs/embucket with iceberg-rust at embucket/iceberg-rust@fix/v2-manifest-field-names)
  • S3 Tables backend (arn:aws:s3tables:us-east-2:767397688925:bucket/snowplow)
  • Lambda 10 GB memory, 9216 MB MEM_POOL_SIZE_MB, ARM64
  • Embucket SQL dialect (Snowflake-compatible)

Minimal repro (deterministic enough to catch on every run)

Source for both tables is the existing atomic_snowplow_manifest.snowplow_web_base_sessions_lifecycle_manifest table (~12.6M rows, 4 columns). Two distinct windows on start_tstamp so target/source overlap by ~50%.

DROP TABLE IF EXISTS demo.atomic.mrg_tgt;
DROP TABLE IF EXISTS demo.atomic.mrg_src;

CREATE TABLE demo.atomic.mrg_tgt AS
SELECT session_identifier, user_identifier, start_tstamp, end_tstamp
FROM demo.atomic_snowplow_manifest.snowplow_web_base_sessions_lifecycle_manifest
WHERE start_tstamp >= CAST('2026-04-14 04:00:00' AS TIMESTAMP_NTZ)
  AND start_tstamp <  CAST('2026-04-14 04:30:00' AS TIMESTAMP_NTZ);
-- 2,101,659 rows in 1 data file

CREATE TABLE demo.atomic.mrg_src AS
SELECT session_identifier, user_identifier, start_tstamp, end_tstamp
FROM demo.atomic_snowplow_manifest.snowplow_web_base_sessions_lifecycle_manifest
WHERE start_tstamp >= CAST('2026-04-14 04:15:00' AS TIMESTAMP_NTZ)
  AND start_tstamp <  CAST('2026-04-14 04:45:00' AS TIMESTAMP_NTZ);
-- 2,103,638 rows in 1 data file
-- 1,051,550 sessions overlap with target; 1,052,088 are new

MERGE INTO demo.atomic.mrg_tgt AS t
USING demo.atomic.mrg_src AS s
ON t.session_identifier = s.session_identifier
WHEN MATCHED THEN UPDATE SET end_tstamp = s.end_tstamp
WHEN NOT MATCHED THEN INSERT (session_identifier, user_identifier, start_tstamp, end_tstamp)
VALUES (s.session_identifier, s.user_identifier, s.start_tstamp, s.end_tstamp);

SELECT COUNT(*) FROM demo.atomic.mrg_tgt;

Expected vs observed

value
target rows pre 2,101,659
source rows 2,103,638
matched / updated 1,051,550
not matched / inserted 1,052,088
expected target post 3,153,747 (= 2,101,659 + 1,052,088)
observed target post varies — 2,453,407 / 2,628,529 / 2,803,433 / 2,978,671 across consecutive runs of the same SQL

The MERGE result counters ({number of rows updated: 1051550, number of rows inserted: 1052088}) are always correct — they account for every source row exactly once. But the actual physical target consistently ends up smaller than original_target + inserted. Some unmatched target rows are silently dropped.

The Iceberg snapshot summary confirms this is happening at the COW overwrite — a single op=overwrite snapshot is committed with the (incorrect) row count baked into total-records:

snap N append    total-records=2101659  total-data-files=1
snap N+1 overwrite total-records=2628529 total-data-files=1

Non-determinism

Same SQL, same input data, same Lambda cold start, four consecutive runs produced losses of:

run observed loss
1 2,453,407 700,340
2 2,628,529 525,218
3 2,803,433 350,314
4 2,978,671 175,076

The loss varies between roughly 0% and 33% of the target. Across many runs the average sits near "50% of expected unmatched-target rows preserved".

Bisecting the trigger

variant loss
Source has zero matches (all inserts, ~100 rows) 0 ✓
Source = exact clone of target (100% match, no inserts) 0 ✓
Single 20-row target, mixed source 0 ✓
Target/source built via LIMIT 2_000_000 from same manifest 0 ✓
Target/source built via WHERE start_tstamp ∈ window (above) dropping
Same window-based CTAS but with ORDER BY session_identifier 0 ✓

The bug only fires when:

  1. The source contains both matched and unmatched rows (a pure all-insert or pure all-update MERGE is correct), and
  2. The target's physical row order is uncorrelated with the join key.

Adding ORDER BY session_identifier to either the target or source CTAS eliminates the loss completely.

Diagnosis pointers

The MERGE COW pipeline is in crates/executor/src/datafusion/physical_plan/merge.rs. The MergeCOWFilterStream (around line 505+) tracks per-file matched/unmatched state via:

  • not_matching_files: HashMap<String, String> (unbounded)

  • not_matched_buffer: LruCache<String, Vec<RecordBatch>> with a tiny capacity:

    let buffer_size = (available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
        / THREAD_FILE_RATIO)
        .max(2);

    On Lambda (available_parallelism ≈ 2, THREAD_FILE_RATIO = 4), this works out to buffer_size = 2.

For the failing repro the target is a single Iceberg data file, so the per-file LRU bookkeeping degenerates and shouldn't itself drop anything. The corruption therefore appears to come from the per-batch filter logic further down (predicate = file_predicate OR source_exists_array), not from LRU eviction. My current best guess is one of:

  1. The per-batch filter on __data_file_path ∈ all_matching_data_files excludes target rows from the current RecordBatch whose file hasn't yet been added to matching_files — the rows for that file are buffered, but only the first appearance per file is buffered cleanly, and a subsequent batch from the same file (still no match in that batch, while another batch in the meantime promoted the file) gets filtered down by file_predicate OR source_exists where file_predicate is computed against all_matching_data_files from a stale view.
  2. The chunked join feeding the filter has a non-stable partition output order, and this code assumes per-batch homogeneity (e.g. that all rows from one source __data_file_path arrive contiguously). Sorting the target by the join key incidentally produces that homogeneity, which is why ORDER BY session_identifier masks the bug.

Both fit the observed "sort-by-key fixes it / non-deterministic without sort / correct counters / silent loss" signature. Happy to instrument further once you confirm which side you'd like investigated first.

Why this matters in practice

This is the root cause of dbt-snowplow-web's snowplow_web_base_sessions_lifecycle_manifest and snowplow_web_user_mapping showing shrinking row counts across rounds in the embucket-example pipeline, which I originally diagnosed as a snapshot-caching issue and then as a Lambda OOM. The actual symptom on real workloads is:

  • dbt run --select snowplow_web reports PASS=31 ERROR=0 cleanly
  • But atomic_derived.snowplow_web_user_mapping shrinks (~600K rows lost in a 6.8M target on a single round)
  • Across many rounds the derived models slowly bleed, with no signal except a row count check

A workaround at the dbt layer (cluster_by=session_identifier on the relevant incremental models, or rewriting the model so the source CTE is sorted) is feasible but invasive. The right fix is in merge.rs.

Workaround for users

ORDER BY <merge_key> either side of the MERGE before running it. For a dbt incremental model, that translates to ensuring the model's compiled __dbt_tmp table is ordered by the unique key — typically a cluster_by config or an explicit ORDER BY in the model SQL.

Repro artifacts still in the bucket

  • demo.atomic.mrg_tgt and demo.atomic.mrg_src are still on the snowplow S3 Tables bucket as of this writing — feel free to repro against them or drop them.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions