Skip to content

[core] Fix LookupMergeFunction to use sequence.field for picking high level records#7221

Open
Liulietong wants to merge 1 commit intoapache:masterfrom
Liulietong:fix-sequence-field-lookup-merge
Open

[core] Fix LookupMergeFunction to use sequence.field for picking high level records#7221
Liulietong wants to merge 1 commit intoapache:masterfrom
Liulietong:fix-sequence-field-lookup-merge

Conversation

@Liulietong
Copy link

Purpose

Fix #7220

When sequence.field is configured, LookupMergeFunction.pickHighLevel() should select the record with the highest sequence value instead of the lowest level number.

Problem

Previously, pickHighLevel() only compared level numbers:

if (highLevel == null || kv.level() < highLevel.level()) {
    highLevel = kv;
}

This could lead to incorrect results when out-of-order data arrives:

  • L1 has sequence=7 (older)
  • L2 has sequence=8 (newer)
  • L0 has sequence=6 (oldest, out-of-order arrival)

The old logic would pick L1 (level 1 < level 2), but the correct behavior should pick L2 (sequence 8 > 7).

Changes

  1. Add sequenceComparator field to LookupMergeFunction
  2. Modify pickHighLevel() to use sequence comparator when available
  3. Modify getResult() to sort records by sequence before adding to merge function
  4. Only set sequenceComparator when user-defined sequence field is configured, preserving original behavior when sequence.field is not set

Tests

Added 3 test cases:

  • testSequenceFieldWithMultipleLevels - verifies sequence.field is used correctly
  • testWithoutSequenceFieldPreservesOriginalBehavior - verifies backward compatibility
  • testSequenceFieldWithDescendingSortOrder - verifies descending sort order works

Impact

Only affects changelog-producer = lookup with sequence.field configured. No impact on:

  • Normal queries (Batch/Streaming Scan)
  • Lookup Join
  • Tables without sequence.field

… level records

When sequence.field is configured, LookupMergeFunction.pickHighLevel() should
select the record with the highest sequence value instead of the lowest level
number. This ensures correct behavior when out-of-order data arrives.

Previously, pickHighLevel() only compared level numbers, which could lead to
incorrect results when:
- L1 has sequence=7 (older)
- L2 has sequence=8 (newer)
- L0 has sequence=6 (oldest, out-of-order arrival)

The old logic would pick L1 (level 1 < level 2), but the correct behavior
should pick L2 (sequence 8 > 7).

This fix:
1. Adds sequenceComparator field to LookupMergeFunction
2. Modifies pickHighLevel() to use sequence comparator when available
3. Modifies getResult() to sort records by sequence before adding to merge function
4. Only sets sequenceComparator when user-defined sequence field is configured,
   preserving original behavior when sequence.field is not set
5. Adds test cases to verify the fix, backward compatibility, and descending sort order

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] LookupMergeFunction.pickHighLevel() ignores sequence.field when selecting high level record

1 participant