diff --git a/pkg/lockservice/lock.go b/pkg/lockservice/lock.go index 6c72da5aaa920..7bbb592984374 100644 --- a/pkg/lockservice/lock.go +++ b/pkg/lockservice/lock.go @@ -100,6 +100,29 @@ func (l Lock) addHolder( logHolderAdded(logger, c, l) } +// setMode updates the lock's mode to match the requested mode. Returns the +// updated Lock and whether the mode actually changed. Because Lock is a value +// type, the caller must write the returned Lock back to the store when changed. +// +// This must be called when a waiter is promoted to holder, because addHolder +// operates on a value copy and cannot update the mode in the store. Without +// this, the stored mode becomes stale: +// - Shared lock with Exclusive holder → new Shared requests incorrectly allowed +// - Exclusive lock with Shared holder → new Shared requests incorrectly blocked +func (l Lock) setMode(mode pb.LockMode) (Lock, bool) { + if mode == pb.LockMode_Exclusive && l.isShared() { + l.value &^= flagLockSharedMode + l.value |= flagLockExclusiveMode + return l, true + } + if mode == pb.LockMode_Shared && !l.isShared() { + l.value &^= flagLockExclusiveMode + l.value |= flagLockSharedMode + return l, true + } + return l, false +} + func (l Lock) isEmpty() bool { return l.holders.size() == 0 && (l.waiters == nil || l.waiters.size() == 0) diff --git a/pkg/lockservice/lock_table_local.go b/pkg/lockservice/lock_table_local.go index 37f339a381763..2c5a28747bee3 100644 --- a/pkg/lockservice/lock_table_local.go +++ b/pkg/lockservice/lock_table_local.go @@ -415,6 +415,15 @@ func (l *localLockTable) acquireRowLockLocked(c *lockContext) error { // only new holder can added lock into txn. // newHolder is false means prev op of txn has already added lock into txn if newHolder { + if updated, changed := lock.setMode(c.opts.Mode); changed { + l.mu.store.Add(key, updated) + // Range lock is stored as two entries (start + end) with + // independent Lock.value bytes. When we update the mode on + // one end we must also update the paired entry so that + // subsequent isLockModeAllowed checks on either key see the + // correct mode. + l.setModePairedRangeLock(key, updated, c.opts.Mode) + } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{key}, l.logger) if err != nil { return err @@ -649,6 +658,13 @@ func (l *localLockTable) addRangeLockLocked( // only new holder can added lock into txn. // newHolder is false means prev op of txn has already added lock into txn if newHolder { + if updated, changed := conflictWith.setMode(c.opts.Mode); changed { + l.mu.store.Add(conflictKey, updated) + // Range lock is stored as two entries (start + end) with + // independent Lock.value bytes. Update the paired entry + // so both ends reflect the correct mode. + l.setModePairedRangeLock(conflictKey, updated, c.opts.Mode) + } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{conflictKey}, l.logger) if err != nil { return nil, Lock{}, err @@ -766,6 +782,53 @@ func (l *localLockTable) mustGetRangeStart(endKey []byte) []byte { return v } +// setModePairedRangeLock updates the mode of the paired range lock entry. +// A range lock is stored as two entries (range-start and range-end) with +// independent Lock.value bytes. When setMode updates one end, this helper +// finds and updates the other end so both entries have a consistent mode. +// It is a no-op for row locks. +func (l *localLockTable) setModePairedRangeLock(key []byte, lock Lock, mode pb.LockMode) { + if lock.isLockRow() { + return + } + if lock.isLockRangeEnd() { + // Find the paired range-start via Prev. Between range-start and + // range-end there may be row locks from other transactions, so we + // scan backwards until we find a range-start entry. + cur := key + for { + prevKey, prevLock, ok := l.mu.store.Prev(cur) + if !ok { + return + } + if prevLock.isLockRangeStart() { + if updated, changed := prevLock.setMode(mode); changed { + l.mu.store.Add(prevKey, updated) + } + return + } + cur = prevKey + } + } else if lock.isLockRangeStart() { + // Find the paired range-end. Between range-start and range-end + // there may be row locks from other transactions, so we scan + // forward until we find a range-end entry. + l.mu.store.Range( + nextKey(key, nil), + nil, + func(k []byte, v Lock) bool { + if v.isLockRangeEnd() { + if updated, changed := v.setMode(mode); changed { + l.mu.store.Add(k, updated) + } + return false // stop + } + return true // keep scanning + }, + ) + } +} + func nextKey(src, dst []byte) []byte { dst = append(dst, src...) dst = append(dst, 0) diff --git a/pkg/lockservice/lock_table_local_test.go b/pkg/lockservice/lock_table_local_test.go index b3e76c59a034d..25dc6fe5846c8 100644 --- a/pkg/lockservice/lock_table_local_test.go +++ b/pkg/lockservice/lock_table_local_test.go @@ -1198,3 +1198,492 @@ type target struct { Start string `json:"start"` End string `json:"end"` } + +// TestExclusiveHolderMustBlockSharedRequests verifies that when an Exclusive +// waiter is promoted to holder (after all Shared holders release), the lock +// entry's mode is correctly updated from Shared to Exclusive, so subsequent +// Shared requests are blocked. +// +// Without the setMode fix, the lock entry's mode stays Shared after the +// Exclusive waiter is promoted (because Lock is a value type and addHolder +// operates on a copy), allowing new Shared requests to slip through. +func TestExclusiveHolderMustBlockSharedRequests(t *testing.T) { + table := uint64(10) + getRunner(false)( + t, + table, + func(ctx context.Context, s *service, lt *localLockTable) { + rows := newTestRows(1) + txn1 := newTestTxnID(1) // Shared holder + txn2 := newTestTxnID(2) // Exclusive waiter → holder + txn3 := newTestTxnID(3) // Shared requester (should be blocked) + + // Step 1: txn1 acquires Shared lock + _, err := s.Lock(ctx, table, rows, txn1, pb.LockOptions{ + Granularity: pb.Granularity_Row, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + }) + require.NoError(t, err) + + // Step 2: txn2 requests Exclusive lock → blocked (waiter) + c2 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn2, pb.LockOptions{ + Mode: pb.LockMode_Exclusive, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c2 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Step 3: txn1 releases Shared lock → txn2 promoted to Exclusive holder + require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{})) + require.NoError(t, <-c2) + + // Step 4: txn3 requests Shared lock → must be blocked + c3 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn3, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c3 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Verify txn3 is still waiting (not immediately granted) + select { + case <-c3: + t.Fatal("Shared request should be blocked by Exclusive holder") + case <-time.After(100 * time.Millisecond): + // Expected: txn3 is blocked + } + + // Cleanup: release txn2 → txn3 can proceed + require.NoError(t, s.Unlock(ctx, txn2, timestamp.Timestamp{})) + require.NoError(t, <-c3) + require.NoError(t, s.Unlock(ctx, txn3, timestamp.Timestamp{})) + }, + ) +} + +// TestSharedAfterExclusiveRelease verifies that when an Exclusive holder +// releases and a Shared waiter is promoted, the lock entry's mode is correctly +// updated from Exclusive to Shared, so subsequent Shared requests are allowed. +func TestSharedAfterExclusiveRelease(t *testing.T) { + table := uint64(10) + getRunner(false)( + t, + table, + func(ctx context.Context, s *service, lt *localLockTable) { + rows := newTestRows(1) + txn1 := newTestTxnID(1) // Exclusive holder + txn2 := newTestTxnID(2) // Shared waiter → holder + txn3 := newTestTxnID(3) // Shared requester (should be allowed) + + // Step 1: txn1 acquires Exclusive lock + mustAddTestLock(t, ctx, s, table, txn1, rows, pb.Granularity_Row) + + // Step 2: txn2 requests Shared lock → blocked + c2 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn2, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c2 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Step 3: txn1 releases Exclusive lock → txn2 promoted to Shared holder + require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{})) + require.NoError(t, <-c2) + + // Step 4: txn3 requests Shared lock → must be allowed (not blocked) + c3 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn3, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c3 <- err + }() + + select { + case err := <-c3: + require.NoError(t, err) // Expected: txn3 granted immediately + case <-time.After(3 * time.Second): + t.Fatal("Shared request should be allowed when only Shared holders exist") + } + + // Cleanup + require.NoError(t, s.Unlock(ctx, txn2, timestamp.Timestamp{})) + require.NoError(t, s.Unlock(ctx, txn3, timestamp.Timestamp{})) + }, + ) +} + +// TestRangeLockModeUpgradeUpdatesBothEnds verifies that when a waiter is promoted +// to holder with a different mode (e.g., Shared -> Exclusive), both ends of the +// range lock (range-start and range-end) are updated to the new mode. +// This tests the setModePairedRangeLock helper. +func TestRangeLockModeUpgradeUpdatesBothEnds(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + exclusiveOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Exclusive, + Policy: pb.WaitPolicy_Wait, + } + + // Step 1: txn1 acquires Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Verify both ends are Shared + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + lt.mu.RLock() + startLock, ok1 := lt.mu.store.Get(rangeStart) + endLock, ok2 := lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1, "range-start should exist") + require.True(t, ok2, "range-end should exist") + require.True(t, startLock.isShared(), "range-start should be Shared initially") + require.True(t, endLock.isShared(), "range-end should be Shared initially") + + // Step 2: txn2 requests Exclusive range lock → blocked + txn2 := newTestTxnID(2) + txn2Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn2, exclusiveOpt) + require.NoError(t, err) + txn2Done <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + + // Step 3: txn1 releases → txn2 promoted to Exclusive holder + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + select { + case <-txn2Done: + case <-time.After(5 * time.Second): + t.Fatal("txn2 (Exclusive) did not acquire range lock in time") + } + + // Step 4: Verify BOTH ends are now Exclusive (this is what setModePairedRangeLock fixes) + lt.mu.RLock() + startLock, ok1 = lt.mu.store.Get(rangeStart) + endLock, ok2 = lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1, "range-start should exist after promotion") + require.True(t, ok2, "range-end should exist after promotion") + require.False(t, startLock.isShared(), + "range-start should be Exclusive after Exclusive waiter promoted (setModePairedRangeLock)") + require.False(t, endLock.isShared(), + "range-end should be Exclusive after Exclusive waiter promoted (setModePairedRangeLock)") + require.Equal(t, pb.LockMode_Exclusive, startLock.GetLockMode(), + "range-start mode should be Exclusive") + require.Equal(t, pb.LockMode_Exclusive, endLock.GetLockMode(), + "range-end mode should be Exclusive") + + // Step 5: txn3 requests Shared range lock → should be blocked by Exclusive holder + txn3 := newTestTxnID(3) + txn3Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn3, sharedOpt) + require.NoError(t, err) + txn3Done <- struct{}{} + }() + + select { + case <-txn3Done: + t.Fatal("txn3 (Shared) should be BLOCKED by txn2 (Exclusive range lock), " + + "but it was granted. This means setModePairedRangeLock did not update both ends.") + case <-time.After(500 * time.Millisecond): + // Expected: txn3 is blocked + } + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn2, timestamp.Timestamp{PhysicalTime: 2})) + select { + case <-txn3Done: + case <-time.After(5 * time.Second): + t.Fatal("txn3 did not acquire lock after txn2 released") + } + require.NoError(t, l.Unlock(ctx, txn3, timestamp.Timestamp{PhysicalTime: 3})) + }, + ) +} + +// TestSetModePairedRangeLockDirect directly tests the setModePairedRangeLock helper +// by manually creating range locks and verifying the paired entry is updated. +func TestSetModePairedRangeLockDirect(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + + // Create a Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table and directly test setModePairedRangeLock + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Test 1: Update range-start, verify range-end is also updated + lt.mu.Lock() + startLock, _ := lt.mu.store.Get(rangeStart) + require.True(t, startLock.isLockRangeStart(), "should be range-start") + require.True(t, startLock.isShared(), "should be Shared initially") + + // Simulate mode upgrade on range-start + updatedStart, changed := startLock.setMode(pb.LockMode_Exclusive) + require.True(t, changed, "mode should change from Shared to Exclusive") + lt.mu.store.Add(rangeStart, updatedStart) + + // Call setModePairedRangeLock to update range-end + lt.setModePairedRangeLock(rangeStart, updatedStart, pb.LockMode_Exclusive) + + // Verify range-end is now Exclusive + endLock, _ := lt.mu.store.Get(rangeEnd) + require.False(t, endLock.isShared(), + "range-end should be Exclusive after setModePairedRangeLock called on range-start") + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + +// TestSetModePairedRangeLockFromRangeEnd tests setModePairedRangeLock when called +// from the range-end entry (backward scan to find range-start). +func TestSetModePairedRangeLockFromRangeEnd(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + + // Create a Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table and directly test setModePairedRangeLock from range-end + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Test: Update range-end, verify range-start is also updated + lt.mu.Lock() + endLock, _ := lt.mu.store.Get(rangeEnd) + require.True(t, endLock.isLockRangeEnd(), "should be range-end") + require.True(t, endLock.isShared(), "should be Shared initially") + + // Simulate mode upgrade on range-end + updatedEnd, changed := endLock.setMode(pb.LockMode_Exclusive) + require.True(t, changed, "mode should change from Shared to Exclusive") + lt.mu.store.Add(rangeEnd, updatedEnd) + + // Call setModePairedRangeLock to update range-start (backward scan) + lt.setModePairedRangeLock(rangeEnd, updatedEnd, pb.LockMode_Exclusive) + + // Verify range-start is now Exclusive + startLock, _ := lt.mu.store.Get(rangeStart) + require.False(t, startLock.isShared(), + "range-start should be Exclusive after setModePairedRangeLock called on range-end") + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + +// TestSetModePairedRangeLockRowLockNoOp verifies that setModePairedRangeLock +// is a no-op for row locks. +func TestSetModePairedRangeLockRowLockNoOp(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rowKey := []byte{3} + + sharedOpt := newTestRowSharedOptions() + + // Create a Shared row lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, [][]byte{rowKey}, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Verify it's a row lock and call setModePairedRangeLock (should be no-op) + lt.mu.Lock() + rowLock, _ := lt.mu.store.Get(rowKey) + require.True(t, rowLock.isLockRow(), "should be row lock") + + // This should be a no-op (early return) + lt.setModePairedRangeLock(rowKey, rowLock, pb.LockMode_Exclusive) + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + +// TestRangeLockWithInterleavedRowLocks verifies that setModePairedRangeLock +// correctly scans past interleaved row locks to find the paired range entry. +// Row locks outside the range can coexist with range locks in the btree. +func TestRangeLockWithInterleavedRowLocks(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + // Use non-overlapping keys: row lock at key 0 (before range), range [1, 10] + rowKey := []byte{0} + rangeStart := []byte{1} + rangeEnd := []byte{10} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedRangeOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + exclusiveRangeOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Exclusive, + Policy: pb.WaitPolicy_Wait, + } + sharedRowOpt := newTestRowSharedOptions() + + // Step 1: txn1 acquires Shared row lock on key 0 (before the range) + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, [][]byte{rowKey}, txn1, sharedRowOpt) + require.NoError(t, err) + + // Step 2: txn2 acquires Shared range lock [1, 10] + txn2 := newTestTxnID(2) + _, err = l.Lock(ctx, tableID, rangeRows, txn2, sharedRangeOpt) + require.NoError(t, err) + + // Verify btree structure: [0:row] [1:range-start] [10:range-end] + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + lt.mu.RLock() + rowLock, ok1 := lt.mu.store.Get(rowKey) + startLock, ok2 := lt.mu.store.Get(rangeStart) + endLock, ok3 := lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1 && ok2 && ok3, "all three locks should exist") + require.True(t, rowLock.isLockRow(), "should be row lock") + require.True(t, startLock.isLockRangeStart(), "should be range-start") + require.True(t, endLock.isLockRangeEnd(), "should be range-end") + + // Step 3: txn3 requests Exclusive range lock → blocked by txn2's Shared range lock + txn3 := newTestTxnID(3) + txn3Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn3, exclusiveRangeOpt) + require.NoError(t, err) + txn3Done <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + + // Step 4: txn2 releases range lock → txn3 promoted to Exclusive holder + require.NoError(t, l.Unlock(ctx, txn2, timestamp.Timestamp{PhysicalTime: 1})) + select { + case <-txn3Done: + case <-time.After(5 * time.Second): + t.Fatal("txn3 (Exclusive) did not acquire range lock in time") + } + + // Step 5: Verify both range-start and range-end are Exclusive + // The row lock at key 0 should not interfere with setModePairedRangeLock + lt.mu.RLock() + startLock, _ = lt.mu.store.Get(rangeStart) + endLock, _ = lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.False(t, startLock.isShared(), + "range-start should be Exclusive after promotion") + require.False(t, endLock.isShared(), + "range-end should be Exclusive after promotion") + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 2})) + require.NoError(t, l.Unlock(ctx, txn3, timestamp.Timestamp{PhysicalTime: 3})) + }, + ) +} diff --git a/pkg/lockservice/types.go b/pkg/lockservice/types.go index b01f014c805d1..3196c5c384464 100644 --- a/pkg/lockservice/types.go +++ b/pkg/lockservice/types.go @@ -245,6 +245,12 @@ type LockOptions struct { // Lock stores specific lock information. Since there are a large number of lock objects // in the LockStorage at runtime, this object has been specially designed to save memory // usage. +// +// WARNING: Lock is a Go value type. Methods with value receivers (e.g., addHolder) +// operate on copies — modifications to the `value` field (which encodes the lock mode) +// will NOT be reflected in the LockStorage. When the lock mode needs to change (e.g., +// a waiter is promoted to holder with a different mode), use setMode() and write the +// returned Lock back to the store via store.Add(key, updated). type Lock struct { createAt time.Time // all lock info will encode into this field to save memory overhead diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 376d02c8c16cf..44a748bd01b80 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -127,6 +127,40 @@ func (s *Scope) DropDatabase(c *Compile) error { return err } + // After acquiring the exclusive lock on mo_database, refresh the + // transaction's snapshot to the latest applied logtail timestamp. + // + // This fixes a race condition between concurrent CLONE (CREATE TABLE) + // and DROP DATABASE: + // 1. CLONE acquires Shared lock on mo_database, creates table in + // mo_tables, commits, releases Shared lock. + // 2. DROP acquires Exclusive lock on mo_database. The lock service + // runs hasNewVersionInRange on mo_database rows, but CLONE did + // NOT modify mo_database (only mo_tables), so changed=false and + // the snapshot is NOT advanced past CLONE's commit timestamp. + // 3. DROP calls Relations() with the stale snapshot, misses the + // newly created table, and drops the database without deleting + // the table — leaving an orphan record in mo_tables that causes + // an OkExpectedEOB panic during checkpoint replay. + // + // By explicitly advancing the snapshot to the latest commit timestamp + // after acquiring the exclusive lock, we ensure Relations() sees all + // tables committed before the lock was granted. + { + txnOp := c.proc.GetTxnOperator() + if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() { + latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS() + if txnOp.Txn().SnapshotTS.Less(latestCommitTS) { + newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS) + if err != nil { + return err + } + if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil { + return err + } + } + } + } // handle sub if db.IsSubscription(c.proc.Ctx) { if err = dropSubscription(c.proc.Ctx, c, dbName); err != nil { @@ -3960,7 +3994,7 @@ func getLockBatch(proc *process.Process, accountId uint32, names []string) (*bat return bat, nil } -var doLockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { +var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { dbRel, err := getRelFromMoCatalog(c, catalog.MO_DATABASE) if err != nil { return err @@ -3980,39 +4014,6 @@ var doLockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) e return nil } -var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { - if err := doLockMoDatabase(c, dbName, lockMode); err != nil { - return err - } - - // After acquiring an exclusive lock on mo_database, refresh the - // transaction's snapshot to the latest commit timestamp. - // - // The lock service only checks mo_database rows (via hasNewVersionInRange) - // to decide whether to advance the snapshot. Concurrent operations that - // only modify mo_tables (e.g., CREATE TABLE, CLONE) will not trigger a - // snapshot advance. Without this explicit refresh, subsequent reads (e.g., - // Relations()) may use a stale snapshot and miss recently committed tables, - // leading to orphan records in mo_tables and OkExpectedEOB panic on replay. - if lockMode == lock.LockMode_Exclusive { - txnOp := c.proc.GetTxnOperator() - if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() { - latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS() - if txnOp.Txn().SnapshotTS.Less(latestCommitTS) { - newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS) - if err != nil { - return err - } - if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil { - return err - } - } - } - } - - return nil -} - var lockMoTable = func( c *Compile, dbName string, diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index 054a0804516a1..169e5cb402ab9 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -896,7 +896,7 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { eng := mock_frontend.NewMockEngine(ctrl) eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() - lockMoDb := gostub.Stub(&doLockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { + lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset() @@ -957,7 +957,7 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { eng := mock_frontend.NewMockEngine(ctrl) eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() - lockMoDb := gostub.Stub(&doLockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { + lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset()