From b9436d31a666fc0f163d4d17780a0fdbae918d96 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Sun, 1 Mar 2026 22:12:19 +0800 Subject: [PATCH 1/9] fix: data branch create table + DROP DATABASE race condition Two-layer fix for orphan records in mo_tables caused by concurrent data branch create table (CLONE) and DROP DATABASE, which leads to ExpectedEOB panic on checkpoint replay. Fix 1 - Lock Service: Lock mode sync on holder promotion Lock is a Go value type. When addHolder is called during tryHold, modifications to l.value (lock mode byte) don't propagate to the store. Add setMode() method to correctly update the stored lock entry's mode when a waiter is promoted to holder (newHolder=true). This ensures Exclusive locks actually block Shared requests. Files: lock.go, lock_table_local.go, types.go Fix 2 - DDL Layer: Snapshot refresh after exclusive lock (clock.Now) After acquiring exclusive lock on mo_database in DropDatabase, call refreshSnapshotAfterLock which uses clock.Now() + WaitLogTailAppliedAt + UpdateSnapshot to advance the transaction snapshot. This ensures Relations() sees all tables committed before the lock was acquired. Uses clock.Now() instead of GetLatestCommitTS() (v1) because doWrite defer LIFO ordering causes unlock to execute before updateLastCommitTS, making the v1 conditional check always false in the critical scenario. Files: ddl.go, ddl_test.go --- pkg/lockservice/lock.go | 23 ++++ pkg/lockservice/lock_table_local.go | 6 ++ pkg/lockservice/lock_table_local_test.go | 129 +++++++++++++++++++++++ pkg/lockservice/types.go | 6 ++ pkg/sql/compile/ddl.go | 51 +++++---- pkg/sql/compile/ddl_test.go | 64 ++++++----- 6 files changed, 230 insertions(+), 49 deletions(-) diff --git a/pkg/lockservice/lock.go b/pkg/lockservice/lock.go index 6c72da5aaa920..7bbb592984374 100644 --- a/pkg/lockservice/lock.go +++ b/pkg/lockservice/lock.go @@ -100,6 +100,29 @@ func (l Lock) addHolder( logHolderAdded(logger, c, l) } +// setMode updates the lock's mode to match the requested mode. Returns the +// updated Lock and whether the mode actually changed. Because Lock is a value +// type, the caller must write the returned Lock back to the store when changed. +// +// This must be called when a waiter is promoted to holder, because addHolder +// operates on a value copy and cannot update the mode in the store. Without +// this, the stored mode becomes stale: +// - Shared lock with Exclusive holder → new Shared requests incorrectly allowed +// - Exclusive lock with Shared holder → new Shared requests incorrectly blocked +func (l Lock) setMode(mode pb.LockMode) (Lock, bool) { + if mode == pb.LockMode_Exclusive && l.isShared() { + l.value &^= flagLockSharedMode + l.value |= flagLockExclusiveMode + return l, true + } + if mode == pb.LockMode_Shared && !l.isShared() { + l.value &^= flagLockExclusiveMode + l.value |= flagLockSharedMode + return l, true + } + return l, false +} + func (l Lock) isEmpty() bool { return l.holders.size() == 0 && (l.waiters == nil || l.waiters.size() == 0) diff --git a/pkg/lockservice/lock_table_local.go b/pkg/lockservice/lock_table_local.go index 37f339a381763..37ad6a3814f63 100644 --- a/pkg/lockservice/lock_table_local.go +++ b/pkg/lockservice/lock_table_local.go @@ -415,6 +415,9 @@ func (l *localLockTable) acquireRowLockLocked(c *lockContext) error { // only new holder can added lock into txn. // newHolder is false means prev op of txn has already added lock into txn if newHolder { + if updated, changed := lock.setMode(c.opts.Mode); changed { + l.mu.store.Add(key, updated) + } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{key}, l.logger) if err != nil { return err @@ -649,6 +652,9 @@ func (l *localLockTable) addRangeLockLocked( // only new holder can added lock into txn. // newHolder is false means prev op of txn has already added lock into txn if newHolder { + if updated, changed := conflictWith.setMode(c.opts.Mode); changed { + l.mu.store.Add(conflictKey, updated) + } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{conflictKey}, l.logger) if err != nil { return nil, Lock{}, err diff --git a/pkg/lockservice/lock_table_local_test.go b/pkg/lockservice/lock_table_local_test.go index b3e76c59a034d..5b68c01da1e21 100644 --- a/pkg/lockservice/lock_table_local_test.go +++ b/pkg/lockservice/lock_table_local_test.go @@ -1198,3 +1198,132 @@ type target struct { Start string `json:"start"` End string `json:"end"` } + +// TestExclusiveHolderMustBlockSharedRequests verifies that when an Exclusive +// waiter is promoted to holder (after all Shared holders release), the lock +// entry's mode is correctly updated from Shared to Exclusive, so subsequent +// Shared requests are blocked. +// +// Without the setMode fix, the lock entry's mode stays Shared after the +// Exclusive waiter is promoted (because Lock is a value type and addHolder +// operates on a copy), allowing new Shared requests to slip through. +func TestExclusiveHolderMustBlockSharedRequests(t *testing.T) { + table := uint64(10) + getRunner(false)( + t, + table, + func(ctx context.Context, s *service, lt *localLockTable) { + rows := newTestRows(1) + txn1 := newTestTxnID(1) // Shared holder + txn2 := newTestTxnID(2) // Exclusive waiter → holder + txn3 := newTestTxnID(3) // Shared requester (should be blocked) + + // Step 1: txn1 acquires Shared lock + _, err := s.Lock(ctx, table, rows, txn1, pb.LockOptions{ + Granularity: pb.Granularity_Row, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + }) + require.NoError(t, err) + + // Step 2: txn2 requests Exclusive lock → blocked (waiter) + c2 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn2, pb.LockOptions{ + Mode: pb.LockMode_Exclusive, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c2 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Step 3: txn1 releases Shared lock → txn2 promoted to Exclusive holder + require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{})) + require.NoError(t, <-c2) + + // Step 4: txn3 requests Shared lock → must be blocked + c3 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn3, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c3 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Verify txn3 is still waiting (not immediately granted) + select { + case <-c3: + t.Fatal("Shared request should be blocked by Exclusive holder") + case <-time.After(100 * time.Millisecond): + // Expected: txn3 is blocked + } + + // Cleanup: release txn2 → txn3 can proceed + require.NoError(t, s.Unlock(ctx, txn2, timestamp.Timestamp{})) + require.NoError(t, <-c3) + require.NoError(t, s.Unlock(ctx, txn3, timestamp.Timestamp{})) + }, + ) +} + +// TestSharedAfterExclusiveRelease verifies that when an Exclusive holder +// releases and a Shared waiter is promoted, the lock entry's mode is correctly +// updated from Exclusive to Shared, so subsequent Shared requests are allowed. +func TestSharedAfterExclusiveRelease(t *testing.T) { + table := uint64(10) + getRunner(false)( + t, + table, + func(ctx context.Context, s *service, lt *localLockTable) { + rows := newTestRows(1) + txn1 := newTestTxnID(1) // Exclusive holder + txn2 := newTestTxnID(2) // Shared waiter → holder + txn3 := newTestTxnID(3) // Shared requester (should be allowed) + + // Step 1: txn1 acquires Exclusive lock + mustAddTestLock(t, ctx, s, table, txn1, rows, pb.Granularity_Row) + + // Step 2: txn2 requests Shared lock → blocked + c2 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn2, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c2 <- err + }() + waitWaiters(t, s, table, rows[0], 1) + + // Step 3: txn1 releases Exclusive lock → txn2 promoted to Shared holder + require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{})) + require.NoError(t, <-c2) + + // Step 4: txn3 requests Shared lock → must be allowed (not blocked) + c3 := make(chan error, 1) + go func() { + _, err := s.Lock(ctx, table, rows, txn3, pb.LockOptions{ + Mode: pb.LockMode_Shared, + Sharding: pb.Sharding_None, + Policy: pb.WaitPolicy_Wait, + }) + c3 <- err + }() + + select { + case err := <-c3: + require.NoError(t, err) // Expected: txn3 granted immediately + case <-time.After(3 * time.Second): + t.Fatal("Shared request should be allowed when only Shared holders exist") + } + + // Cleanup + require.NoError(t, s.Unlock(ctx, txn2, timestamp.Timestamp{})) + require.NoError(t, s.Unlock(ctx, txn3, timestamp.Timestamp{})) + }, + ) +} diff --git a/pkg/lockservice/types.go b/pkg/lockservice/types.go index b01f014c805d1..3196c5c384464 100644 --- a/pkg/lockservice/types.go +++ b/pkg/lockservice/types.go @@ -245,6 +245,12 @@ type LockOptions struct { // Lock stores specific lock information. Since there are a large number of lock objects // in the LockStorage at runtime, this object has been specially designed to save memory // usage. +// +// WARNING: Lock is a Go value type. Methods with value receivers (e.g., addHolder) +// operate on copies — modifications to the `value` field (which encodes the lock mode) +// will NOT be reflected in the LockStorage. When the lock mode needs to change (e.g., +// a waiter is promoted to holder with a different mode), use setMode() and write the +// returned Lock back to the store via store.Add(key, updated). type Lock struct { createAt time.Time // all lock info will encode into this field to save memory overhead diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 376d02c8c16cf..e19f1ec96f861 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -3985,34 +3985,45 @@ var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) err return err } - // After acquiring an exclusive lock on mo_database, refresh the - // transaction's snapshot to the latest commit timestamp. + // After acquiring an exclusive lock on mo_database, advance the + // transaction's snapshot to the current HLC timestamp. // - // The lock service only checks mo_database rows (via hasNewVersionInRange) - // to decide whether to advance the snapshot. Concurrent operations that - // only modify mo_tables (e.g., CREATE TABLE, CLONE) will not trigger a - // snapshot advance. Without this explicit refresh, subsequent reads (e.g., - // Relations()) may use a stale snapshot and miss recently committed tables, - // leading to orphan records in mo_tables and OkExpectedEOB panic on replay. + // This is necessary because the lock service only checks mo_database rows + // (via hasNewVersionInRange) to decide whether to advance the snapshot. + // Concurrent operations that only modify mo_tables (e.g., CREATE TABLE, + // CLONE) will not trigger a snapshot advance. Without this explicit refresh, + // subsequent reads (e.g., Relations()) may use a stale snapshot and miss + // recently committed tables, leading to orphan records in mo_tables and + // OkExpectedEOB panic on replay. + // + // Note: we use clock.Now() instead of GetLatestCommitTS() because the + // doWrite defer LIFO ordering causes unlock (which releases the Shared lock + // and wakes up the Exclusive waiter) to execute BEFORE closeLocked (which + // triggers updateLastCommitTS). So when DROP is woken up, latestCommitTS + // has not yet been updated, and the condition snapshotTS < latestCommitTS + // would be false, causing the fix to be skipped entirely. if lockMode == lock.LockMode_Exclusive { - txnOp := c.proc.GetTxnOperator() - if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() { - latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS() - if txnOp.Txn().SnapshotTS.Less(latestCommitTS) { - newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS) - if err != nil { - return err - } - if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil { - return err - } - } + if err := refreshSnapshotAfterLock(c); err != nil { + return err } } return nil } +// refreshSnapshotAfterLock advances the transaction's snapshot to the current +// HLC timestamp. This ensures that subsequent reads (e.g. Relations()) see all +// data committed up to this moment, regardless of whether the lock service +// reported a conflict. +var refreshSnapshotAfterLock = func(c *Compile) error { + now, _ := moruntime.ServiceRuntime(c.proc.GetService()).Clock().Now() + ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, now) + if err != nil { + return err + } + return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) +} + var lockMoTable = func( c *Compile, dbName string, diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index 054a0804516a1..f8df8bc15767f 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -824,6 +824,11 @@ func Test_toHours(t *testing.T) { // mo_database. This prevents the race condition where a concurrent CLONE // (CREATE TABLE) commits between the snapshot and the lock acquisition, // leaving orphan records in mo_tables. +// +// The current implementation uses clock.Now() + WaitLogTailAppliedAt(now) to +// unconditionally advance the snapshot, rather than the v1 approach which used +// GetLatestCommitTS() with a conditional check (which failed due to defer LIFO +// ordering in doWrite causing unlock to execute before updateLastCommitTS). func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { dropDbDef := &plan2.DropDatabase{ IfExists: false, @@ -845,12 +850,10 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { TxnOffset: 0, } - snapshotTS := timestamp.Timestamp{PhysicalTime: 100} - latestCommitTS := timestamp.Timestamp{PhysicalTime: 200} appliedTS := timestamp.Timestamp{PhysicalTime: 200} - // Test 1: When snapshotTS < latestCommitTS, UpdateSnapshot MUST be called. - t.Run("snapshot_refreshed_when_stale", func(t *testing.T) { + // Test: refreshSnapshotAfterLock is called and UpdateSnapshot is invoked. + t.Run("snapshot_always_refreshed_via_clock_now", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -865,9 +868,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, - SnapshotTS: snapshotTS, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -881,8 +883,6 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() - txnCli.EXPECT().GetLatestCommitTS().Return(latestCommitTS).Times(1) - txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), latestCommitTS).Return(appliedTS, nil).Times(1) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp @@ -901,6 +901,19 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { }) defer lockMoDb.Reset() + // Stub refreshSnapshotAfterLock to simulate the clock.Now() path: + // it calls WaitLogTailAppliedAt and UpdateSnapshot unconditionally. + refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { + ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, appliedTS) + if err != nil { + return err + } + return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) + }) + defer refreshStub.Reset() + + txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), appliedTS).Return(appliedTS, nil).Times(1) + c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) // The test will error at Relations(), but the key assertion is that @@ -908,8 +921,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { assert.Error(t, err) }) - // Test 2: When snapshotTS >= latestCommitTS, UpdateSnapshot must NOT be called. - t.Run("snapshot_not_refreshed_when_fresh", func(t *testing.T) { + // Test: refreshSnapshotAfterLock error is propagated. + t.Run("snapshot_refresh_error_propagated", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -919,17 +932,13 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { proc.Ctx = ctx proc.ReplaceTopCtx(ctx) - freshSnapshotTS := timestamp.Timestamp{PhysicalTime: 300} - staleCommitTS := timestamp.Timestamp{PhysicalTime: 200} - txnOp := mock_frontend.NewMockTxnOperator(ctrl) txnOp.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, - SnapshotTS: freshSnapshotTS, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -938,32 +947,29 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Snapshot().Return(txn.CNTxnSnapshot{}, nil).AnyTimes() txnOp.EXPECT().Status().Return(txn.TxnStatus_Active).AnyTimes() - // Key assertion: UpdateSnapshot must NOT be called. - txnOp.EXPECT().UpdateSnapshot(gomock.Any(), gomock.Any()).Times(0) - txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() - txnCli.EXPECT().GetLatestCommitTS().Return(staleCommitTS).Times(1) - // WaitLogTailAppliedAt should NOT be called either. - txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), gomock.Any()).Times(0) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp - mockDb := mock_frontend.NewMockDatabase(ctrl) - mockDb.EXPECT().IsSubscription(gomock.Any()).Return(false).AnyTimes() - mockDb.EXPECT().Relations(gomock.Any()).Return(nil, moerr.NewInternalErrorNoCtx("stop here")).AnyTimes() - eng := mock_frontend.NewMockEngine(ctrl) - eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() + eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mock_frontend.NewMockDatabase(ctrl), nil).AnyTimes() lockMoDb := gostub.Stub(&doLockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset() + // Stub refreshSnapshotAfterLock to return an error. + expectedErr := moerr.NewInternalErrorNoCtx("logtail wait failed") + refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { + return expectedErr + }) + defer refreshStub.Reset() + c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) - assert.Error(t, err) + assert.ErrorIs(t, err, expectedErr) }) } From 6d32c86ee4380ccc9e0e3559b0af3ff46703daae Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Sun, 1 Mar 2026 23:46:03 +0800 Subject: [PATCH 2/9] fix: move refreshSnapshotAfterLock from lockMoDatabase to DropDatabase only refreshSnapshotAfterLock was inside lockMoDatabase and triggered for all Exclusive lock acquisitions. CreateDatabase also calls lockMoDatabase with Exclusive mode, so during restore cluster operations the snapshot advance caused CreateDatabase to see already-restored tables, producing: Duplicate entry '(0,mo_catalog,mo_branch_metadata)' for key '__mo_cpkey_col' Move refreshSnapshotAfterLock to DropDatabase directly, since only DropDatabase needs the snapshot advance (to enumerate tables via Relations()). lockMoDatabase is now a simple wrapper around doLockMoDatabase. --- pkg/sql/compile/ddl.go | 41 +++++++++++++---------------------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index e19f1ec96f861..02482906601e7 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -127,6 +127,18 @@ func (s *Scope) DropDatabase(c *Compile) error { return err } + // After acquiring the exclusive lock on mo_database, advance the + // transaction's snapshot so that Relations() sees all tables committed + // before the lock was acquired. Only needed for DropDatabase because it + // enumerates tables via Relations() and must not miss any. + // + // This must NOT be in lockMoDatabase itself, because CreateDatabase also + // calls lockMoDatabase(Exclusive) and advancing the snapshot there can + // cause duplicate-key errors during restore cluster operations. + if err = refreshSnapshotAfterLock(c); err != nil { + return err + } + // handle sub if db.IsSubscription(c.proc.Ctx) { if err = dropSubscription(c.proc.Ctx, c, dbName); err != nil { @@ -3981,34 +3993,7 @@ var doLockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) e } var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { - if err := doLockMoDatabase(c, dbName, lockMode); err != nil { - return err - } - - // After acquiring an exclusive lock on mo_database, advance the - // transaction's snapshot to the current HLC timestamp. - // - // This is necessary because the lock service only checks mo_database rows - // (via hasNewVersionInRange) to decide whether to advance the snapshot. - // Concurrent operations that only modify mo_tables (e.g., CREATE TABLE, - // CLONE) will not trigger a snapshot advance. Without this explicit refresh, - // subsequent reads (e.g., Relations()) may use a stale snapshot and miss - // recently committed tables, leading to orphan records in mo_tables and - // OkExpectedEOB panic on replay. - // - // Note: we use clock.Now() instead of GetLatestCommitTS() because the - // doWrite defer LIFO ordering causes unlock (which releases the Shared lock - // and wakes up the Exclusive waiter) to execute BEFORE closeLocked (which - // triggers updateLastCommitTS). So when DROP is woken up, latestCommitTS - // has not yet been updated, and the condition snapshotTS < latestCommitTS - // would be false, causing the fix to be skipped entirely. - if lockMode == lock.LockMode_Exclusive { - if err := refreshSnapshotAfterLock(c); err != nil { - return err - } - } - - return nil + return doLockMoDatabase(c, dbName, lockMode) } // refreshSnapshotAfterLock advances the transaction's snapshot to the current From 3cf6494813753ef063be3d990c6ffd753476cd1e Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 16:10:05 +0800 Subject: [PATCH 3/9] U --- pkg/sql/compile/ddl.go | 67 +++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 02482906601e7..8e1232d9b8214 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -127,18 +127,40 @@ func (s *Scope) DropDatabase(c *Compile) error { return err } - // After acquiring the exclusive lock on mo_database, advance the - // transaction's snapshot so that Relations() sees all tables committed - // before the lock was acquired. Only needed for DropDatabase because it - // enumerates tables via Relations() and must not miss any. + // After acquiring the exclusive lock on mo_database, refresh the + // transaction's snapshot to the latest applied logtail timestamp. // - // This must NOT be in lockMoDatabase itself, because CreateDatabase also - // calls lockMoDatabase(Exclusive) and advancing the snapshot there can - // cause duplicate-key errors during restore cluster operations. - if err = refreshSnapshotAfterLock(c); err != nil { - return err + // This fixes a race condition between concurrent CLONE (CREATE TABLE) + // and DROP DATABASE: + // 1. CLONE acquires Shared lock on mo_database, creates table in + // mo_tables, commits, releases Shared lock. + // 2. DROP acquires Exclusive lock on mo_database. The lock service + // runs hasNewVersionInRange on mo_database rows, but CLONE did + // NOT modify mo_database (only mo_tables), so changed=false and + // the snapshot is NOT advanced past CLONE's commit timestamp. + // 3. DROP calls Relations() with the stale snapshot, misses the + // newly created table, and drops the database without deleting + // the table — leaving an orphan record in mo_tables that causes + // an OkExpectedEOB panic during checkpoint replay. + // + // By explicitly advancing the snapshot to the latest commit timestamp + // after acquiring the exclusive lock, we ensure Relations() sees all + // tables committed before the lock was granted. + { + txnOp := c.proc.GetTxnOperator() + if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() { + latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS() + if txnOp.Txn().SnapshotTS.Less(latestCommitTS) { + newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS) + if err != nil { + return err + } + if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil { + return err + } + } + } } - // handle sub if db.IsSubscription(c.proc.Ctx) { if err = dropSubscription(c.proc.Ctx, c, dbName); err != nil { @@ -3993,20 +4015,23 @@ var doLockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) e } var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { - return doLockMoDatabase(c, dbName, lockMode) -} - -// refreshSnapshotAfterLock advances the transaction's snapshot to the current -// HLC timestamp. This ensures that subsequent reads (e.g. Relations()) see all -// data committed up to this moment, regardless of whether the lock service -// reported a conflict. -var refreshSnapshotAfterLock = func(c *Compile) error { - now, _ := moruntime.ServiceRuntime(c.proc.GetService()).Clock().Now() - ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, now) + dbRel, err := getRelFromMoCatalog(c, catalog.MO_DATABASE) if err != nil { return err } - return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) + accountID, err := defines.GetAccountId(c.proc.Ctx) + if err != nil { + return err + } + bat, err := getLockBatch(c.proc, accountID, []string{dbName}) + if err != nil { + return err + } + defer bat.GetVector(0).Free(c.proc.Mp()) + if err := lockRows(c.e, c.proc, dbRel, bat, 0, lockMode, lock.Sharding_None, accountID); err != nil { + return err + } + return nil } var lockMoTable = func( From 180916b00be778a0c6500e2aa5f0dc4fe0819333 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 17:18:40 +0800 Subject: [PATCH 4/9] lockservice,txn: fix clone+drop race by syncing range lock mode and reordering doWrite defers Two fixes for the data-branch-create-table + DROP DATABASE race that leaves orphan rows in mo_tables: 1. lockservice: sync mode on both ends of range lock (lock_table_local.go) When a waiter is promoted to holder, setMode updates Lock.value on the conflicting key only. But a range lock is stored as two independent btree entries (range-start and range-end) with separate Lock.value bytes. The paired entry retains stale mode, causing isLockModeAllowed to incorrectly allow or deny Shared requests. Add setModePairedRangeLock helper that scans past interleaved row locks to find and update the paired entry. Called from both acquireRowLockLocked and addRangeLockLocked when newHolder=true. No-op for row locks. 2. txn: reorder defers in doWrite to update latestCommitTS before unlock (operator.go) Original code registered two independent defers in doWrite: defer tc.unlock(ctx) // LIFO: executes first defer func() { closeLocked(); mu.Unlock() }() // executes second This meant unlock released the lock-service lock before closeLocked triggered updateLastCommitTS. In the window between unlock and closeLocked, DROP DATABASE could acquire the Exclusive lock and call GetLatestCommitTS, getting a stale value. The UpdateSnapshot fix (layer 2) would then not trigger because SnapshotTS >= stale latestCommitTS. Merge unlock into the first defer so execution order becomes: closeLocked() -> mu.Unlock() -> unlock(ctx) This ensures latestCommitTS is updated before the lock is released. --- pkg/lockservice/lock_table_local.go | 57 +++++++++++++++++++++++++++++ pkg/txn/client/operator.go | 6 ++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/pkg/lockservice/lock_table_local.go b/pkg/lockservice/lock_table_local.go index 37ad6a3814f63..a2994a8428264 100644 --- a/pkg/lockservice/lock_table_local.go +++ b/pkg/lockservice/lock_table_local.go @@ -417,6 +417,12 @@ func (l *localLockTable) acquireRowLockLocked(c *lockContext) error { if newHolder { if updated, changed := lock.setMode(c.opts.Mode); changed { l.mu.store.Add(key, updated) + // Range lock is stored as two entries (start + end) with + // independent Lock.value bytes. When we update the mode on + // one end we must also update the paired entry so that + // subsequent isLockModeAllowed checks on either key see the + // correct mode. + l.setModePairedRangeLock(key, updated, c.opts.Mode) } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{key}, l.logger) if err != nil { @@ -654,6 +660,10 @@ func (l *localLockTable) addRangeLockLocked( if newHolder { if updated, changed := conflictWith.setMode(c.opts.Mode); changed { l.mu.store.Add(conflictKey, updated) + // Range lock is stored as two entries (start + end) with + // independent Lock.value bytes. Update the paired entry + // so both ends reflect the correct mode. + l.setModePairedRangeLock(conflictKey, updated, c.opts.Mode) } err := c.txn.lockAdded(l.bind.Group, l.bind, [][]byte{conflictKey}, l.logger) if err != nil { @@ -771,6 +781,53 @@ func (l *localLockTable) mustGetRangeStart(endKey []byte) []byte { } return v } +// setModePairedRangeLock updates the mode of the paired range lock entry. +// A range lock is stored as two entries (range-start and range-end) with +// independent Lock.value bytes. When setMode updates one end, this helper +// finds and updates the other end so both entries have a consistent mode. +// It is a no-op for row locks. +func (l *localLockTable) setModePairedRangeLock(key []byte, lock Lock, mode pb.LockMode) { + if lock.isLockRow() { + return + } + if lock.isLockRangeEnd() { + // Find the paired range-start via Prev. Between range-start and + // range-end there may be row locks from other transactions, so we + // scan backwards until we find a range-start entry. + cur := key + for { + prevKey, prevLock, ok := l.mu.store.Prev(cur) + if !ok { + return + } + if prevLock.isLockRangeStart() { + if updated, changed := prevLock.setMode(mode); changed { + l.mu.store.Add(prevKey, updated) + } + return + } + cur = prevKey + } + } else if lock.isLockRangeStart() { + // Find the paired range-end. Between range-start and range-end + // there may be row locks from other transactions, so we scan + // forward until we find a range-end entry. + l.mu.store.Range( + nextKey(key, nil), + nil, + func(k []byte, v Lock) bool { + if v.isLockRangeEnd() { + if updated, changed := v.setMode(mode); changed { + l.mu.store.Add(k, updated) + } + return false // stop + } + return true // keep scanning + }, + ) + } +} + func nextKey(src, dst []byte) []byte { dst = append(dst, src...) diff --git a/pkg/txn/client/operator.go b/pkg/txn/client/operator.go index aaa0cbf2f97e8..bcac966c7aab2 100644 --- a/pkg/txn/client/operator.go +++ b/pkg/txn/client/operator.go @@ -1015,6 +1015,7 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c tc.logger.Fatal("can not write on ready only transaction") } var payload []txn.TxnRequest + var needUnlock bool if commit { if tc.reset.workspace != nil { reqs, err := tc.reset.workspace.Commit(ctx) @@ -1027,6 +1028,9 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c defer func() { tc.closeLocked(ctx) tc.mu.Unlock() + if needUnlock { + tc.unlock(ctx) + } }() if tc.mu.closed { tc.reset.commitErr = moerr.NewTxnClosedNoCtx(tc.reset.txnID) @@ -1035,7 +1039,7 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c if tc.needUnlockLocked() { tc.mu.txn.LockTables = tc.mu.lockTables - defer tc.unlock(ctx) + needUnlock = true } } From df10b8215617bc747e8a76d162bb219caf63a8fc Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 18:03:23 +0800 Subject: [PATCH 5/9] fix SCA --- pkg/lockservice/lock_table_local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lockservice/lock_table_local.go b/pkg/lockservice/lock_table_local.go index a2994a8428264..2c5a28747bee3 100644 --- a/pkg/lockservice/lock_table_local.go +++ b/pkg/lockservice/lock_table_local.go @@ -781,6 +781,7 @@ func (l *localLockTable) mustGetRangeStart(endKey []byte) []byte { } return v } + // setModePairedRangeLock updates the mode of the paired range lock entry. // A range lock is stored as two entries (range-start and range-end) with // independent Lock.value bytes. When setMode updates one end, this helper @@ -828,7 +829,6 @@ func (l *localLockTable) setModePairedRangeLock(key []byte, lock Lock, mode pb.L } } - func nextKey(src, dst []byte) []byte { dst = append(dst, src...) dst = append(dst, 0) From 9877ca30c4310ae7c693dec38a9ab88f00aaa650 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 18:13:10 +0800 Subject: [PATCH 6/9] add ut --- pkg/lockservice/lock_table_local_test.go | 203 +++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/pkg/lockservice/lock_table_local_test.go b/pkg/lockservice/lock_table_local_test.go index 5b68c01da1e21..38868198c7488 100644 --- a/pkg/lockservice/lock_table_local_test.go +++ b/pkg/lockservice/lock_table_local_test.go @@ -1327,3 +1327,206 @@ func TestSharedAfterExclusiveRelease(t *testing.T) { }, ) } + +// TestRangeLockModeUpgradeUpdatesBothEnds verifies that when a waiter is promoted +// to holder with a different mode (e.g., Shared -> Exclusive), both ends of the +// range lock (range-start and range-end) are updated to the new mode. +// This tests the setModePairedRangeLock helper. +func TestRangeLockModeUpgradeUpdatesBothEnds(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + exclusiveOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Exclusive, + Policy: pb.WaitPolicy_Wait, + } + + // Step 1: txn1 acquires Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Verify both ends are Shared + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + lt.mu.RLock() + startLock, ok1 := lt.mu.store.Get(rangeStart) + endLock, ok2 := lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1, "range-start should exist") + require.True(t, ok2, "range-end should exist") + require.True(t, startLock.isShared(), "range-start should be Shared initially") + require.True(t, endLock.isShared(), "range-end should be Shared initially") + + // Step 2: txn2 requests Exclusive range lock → blocked + txn2 := newTestTxnID(2) + txn2Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn2, exclusiveOpt) + require.NoError(t, err) + txn2Done <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + + // Step 3: txn1 releases → txn2 promoted to Exclusive holder + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + select { + case <-txn2Done: + case <-time.After(5 * time.Second): + t.Fatal("txn2 (Exclusive) did not acquire range lock in time") + } + + // Step 4: Verify BOTH ends are now Exclusive (this is what setModePairedRangeLock fixes) + lt.mu.RLock() + startLock, ok1 = lt.mu.store.Get(rangeStart) + endLock, ok2 = lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1, "range-start should exist after promotion") + require.True(t, ok2, "range-end should exist after promotion") + require.False(t, startLock.isShared(), + "range-start should be Exclusive after Exclusive waiter promoted (setModePairedRangeLock)") + require.False(t, endLock.isShared(), + "range-end should be Exclusive after Exclusive waiter promoted (setModePairedRangeLock)") + require.Equal(t, pb.LockMode_Exclusive, startLock.GetLockMode(), + "range-start mode should be Exclusive") + require.Equal(t, pb.LockMode_Exclusive, endLock.GetLockMode(), + "range-end mode should be Exclusive") + + // Step 5: txn3 requests Shared range lock → should be blocked by Exclusive holder + txn3 := newTestTxnID(3) + txn3Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn3, sharedOpt) + require.NoError(t, err) + txn3Done <- struct{}{} + }() + + select { + case <-txn3Done: + t.Fatal("txn3 (Shared) should be BLOCKED by txn2 (Exclusive range lock), " + + "but it was granted. This means setModePairedRangeLock did not update both ends.") + case <-time.After(500 * time.Millisecond): + // Expected: txn3 is blocked + } + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn2, timestamp.Timestamp{PhysicalTime: 2})) + select { + case <-txn3Done: + case <-time.After(5 * time.Second): + t.Fatal("txn3 did not acquire lock after txn2 released") + } + require.NoError(t, l.Unlock(ctx, txn3, timestamp.Timestamp{PhysicalTime: 3})) + }, + ) +} + +// TestRangeLockWithInterleavedRowLocks verifies that setModePairedRangeLock +// correctly scans past interleaved row locks to find the paired range entry. +// Row locks outside the range can coexist with range locks in the btree. +func TestRangeLockWithInterleavedRowLocks(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + // Use non-overlapping keys: row lock at key 0 (before range), range [1, 10] + rowKey := []byte{0} + rangeStart := []byte{1} + rangeEnd := []byte{10} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedRangeOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + exclusiveRangeOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Exclusive, + Policy: pb.WaitPolicy_Wait, + } + sharedRowOpt := newTestRowSharedOptions() + + // Step 1: txn1 acquires Shared row lock on key 0 (before the range) + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, [][]byte{rowKey}, txn1, sharedRowOpt) + require.NoError(t, err) + + // Step 2: txn2 acquires Shared range lock [1, 10] + txn2 := newTestTxnID(2) + _, err = l.Lock(ctx, tableID, rangeRows, txn2, sharedRangeOpt) + require.NoError(t, err) + + // Verify btree structure: [0:row] [1:range-start] [10:range-end] + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + lt.mu.RLock() + rowLock, ok1 := lt.mu.store.Get(rowKey) + startLock, ok2 := lt.mu.store.Get(rangeStart) + endLock, ok3 := lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.True(t, ok1 && ok2 && ok3, "all three locks should exist") + require.True(t, rowLock.isLockRow(), "should be row lock") + require.True(t, startLock.isLockRangeStart(), "should be range-start") + require.True(t, endLock.isLockRangeEnd(), "should be range-end") + + // Step 3: txn3 requests Exclusive range lock → blocked by txn2's Shared range lock + txn3 := newTestTxnID(3) + txn3Done := make(chan struct{}, 1) + go func() { + _, err := l.Lock(ctx, tableID, rangeRows, txn3, exclusiveRangeOpt) + require.NoError(t, err) + txn3Done <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + + // Step 4: txn2 releases range lock → txn3 promoted to Exclusive holder + require.NoError(t, l.Unlock(ctx, txn2, timestamp.Timestamp{PhysicalTime: 1})) + select { + case <-txn3Done: + case <-time.After(5 * time.Second): + t.Fatal("txn3 (Exclusive) did not acquire range lock in time") + } + + // Step 5: Verify both range-start and range-end are Exclusive + // The row lock at key 0 should not interfere with setModePairedRangeLock + lt.mu.RLock() + startLock, _ = lt.mu.store.Get(rangeStart) + endLock, _ = lt.mu.store.Get(rangeEnd) + lt.mu.RUnlock() + require.False(t, startLock.isShared(), + "range-start should be Exclusive after promotion") + require.False(t, endLock.isShared(), + "range-end should be Exclusive after promotion") + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 2})) + require.NoError(t, l.Unlock(ctx, txn3, timestamp.Timestamp{PhysicalTime: 3})) + }, + ) +} From 180cc0d8c6cbdb9dca67352ec293b002d5ef508b Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 18:49:37 +0800 Subject: [PATCH 7/9] fix ut --- pkg/sql/compile/ddl_test.go | 68 +++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index f8df8bc15767f..169e5cb402ab9 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -824,11 +824,6 @@ func Test_toHours(t *testing.T) { // mo_database. This prevents the race condition where a concurrent CLONE // (CREATE TABLE) commits between the snapshot and the lock acquisition, // leaving orphan records in mo_tables. -// -// The current implementation uses clock.Now() + WaitLogTailAppliedAt(now) to -// unconditionally advance the snapshot, rather than the v1 approach which used -// GetLatestCommitTS() with a conditional check (which failed due to defer LIFO -// ordering in doWrite causing unlock to execute before updateLastCommitTS). func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { dropDbDef := &plan2.DropDatabase{ IfExists: false, @@ -850,10 +845,12 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { TxnOffset: 0, } + snapshotTS := timestamp.Timestamp{PhysicalTime: 100} + latestCommitTS := timestamp.Timestamp{PhysicalTime: 200} appliedTS := timestamp.Timestamp{PhysicalTime: 200} - // Test: refreshSnapshotAfterLock is called and UpdateSnapshot is invoked. - t.Run("snapshot_always_refreshed_via_clock_now", func(t *testing.T) { + // Test 1: When snapshotTS < latestCommitTS, UpdateSnapshot MUST be called. + t.Run("snapshot_refreshed_when_stale", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -868,8 +865,9 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, + SnapshotTS: snapshotTS, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -883,6 +881,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() + txnCli.EXPECT().GetLatestCommitTS().Return(latestCommitTS).Times(1) + txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), latestCommitTS).Return(appliedTS, nil).Times(1) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp @@ -896,24 +896,11 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { eng := mock_frontend.NewMockEngine(ctrl) eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() - lockMoDb := gostub.Stub(&doLockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { + lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset() - // Stub refreshSnapshotAfterLock to simulate the clock.Now() path: - // it calls WaitLogTailAppliedAt and UpdateSnapshot unconditionally. - refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { - ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, appliedTS) - if err != nil { - return err - } - return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) - }) - defer refreshStub.Reset() - - txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), appliedTS).Return(appliedTS, nil).Times(1) - c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) // The test will error at Relations(), but the key assertion is that @@ -921,8 +908,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { assert.Error(t, err) }) - // Test: refreshSnapshotAfterLock error is propagated. - t.Run("snapshot_refresh_error_propagated", func(t *testing.T) { + // Test 2: When snapshotTS >= latestCommitTS, UpdateSnapshot must NOT be called. + t.Run("snapshot_not_refreshed_when_fresh", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -932,13 +919,17 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { proc.Ctx = ctx proc.ReplaceTopCtx(ctx) + freshSnapshotTS := timestamp.Timestamp{PhysicalTime: 300} + staleCommitTS := timestamp.Timestamp{PhysicalTime: 200} + txnOp := mock_frontend.NewMockTxnOperator(ctrl) txnOp.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, + SnapshotTS: freshSnapshotTS, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -947,29 +938,32 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Snapshot().Return(txn.CNTxnSnapshot{}, nil).AnyTimes() txnOp.EXPECT().Status().Return(txn.TxnStatus_Active).AnyTimes() + // Key assertion: UpdateSnapshot must NOT be called. + txnOp.EXPECT().UpdateSnapshot(gomock.Any(), gomock.Any()).Times(0) + txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() + txnCli.EXPECT().GetLatestCommitTS().Return(staleCommitTS).Times(1) + // WaitLogTailAppliedAt should NOT be called either. + txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), gomock.Any()).Times(0) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp + mockDb := mock_frontend.NewMockDatabase(ctrl) + mockDb.EXPECT().IsSubscription(gomock.Any()).Return(false).AnyTimes() + mockDb.EXPECT().Relations(gomock.Any()).Return(nil, moerr.NewInternalErrorNoCtx("stop here")).AnyTimes() + eng := mock_frontend.NewMockEngine(ctrl) - eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mock_frontend.NewMockDatabase(ctrl), nil).AnyTimes() + eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() - lockMoDb := gostub.Stub(&doLockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { + lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset() - // Stub refreshSnapshotAfterLock to return an error. - expectedErr := moerr.NewInternalErrorNoCtx("logtail wait failed") - refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { - return expectedErr - }) - defer refreshStub.Reset() - c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) - assert.ErrorIs(t, err, expectedErr) + assert.Error(t, err) }) } From b85b8bd2271b70bbac81909078aac955031f87f1 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 4 Mar 2026 19:08:52 +0800 Subject: [PATCH 8/9] add ut --- pkg/lockservice/lock_table_local_test.go | 157 +++++++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/pkg/lockservice/lock_table_local_test.go b/pkg/lockservice/lock_table_local_test.go index 38868198c7488..25dc6fe5846c8 100644 --- a/pkg/lockservice/lock_table_local_test.go +++ b/pkg/lockservice/lock_table_local_test.go @@ -1439,6 +1439,163 @@ func TestRangeLockModeUpgradeUpdatesBothEnds(t *testing.T) { ) } +// TestSetModePairedRangeLockDirect directly tests the setModePairedRangeLock helper +// by manually creating range locks and verifying the paired entry is updated. +func TestSetModePairedRangeLockDirect(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + + // Create a Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table and directly test setModePairedRangeLock + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Test 1: Update range-start, verify range-end is also updated + lt.mu.Lock() + startLock, _ := lt.mu.store.Get(rangeStart) + require.True(t, startLock.isLockRangeStart(), "should be range-start") + require.True(t, startLock.isShared(), "should be Shared initially") + + // Simulate mode upgrade on range-start + updatedStart, changed := startLock.setMode(pb.LockMode_Exclusive) + require.True(t, changed, "mode should change from Shared to Exclusive") + lt.mu.store.Add(rangeStart, updatedStart) + + // Call setModePairedRangeLock to update range-end + lt.setModePairedRangeLock(rangeStart, updatedStart, pb.LockMode_Exclusive) + + // Verify range-end is now Exclusive + endLock, _ := lt.mu.store.Get(rangeEnd) + require.False(t, endLock.isShared(), + "range-end should be Exclusive after setModePairedRangeLock called on range-start") + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + +// TestSetModePairedRangeLockFromRangeEnd tests setModePairedRangeLock when called +// from the range-end entry (backward scan to find range-start). +func TestSetModePairedRangeLockFromRangeEnd(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rangeStart := []byte{1} + rangeEnd := []byte{5} + rangeRows := [][]byte{rangeStart, rangeEnd} + + sharedOpt := pb.LockOptions{ + Granularity: pb.Granularity_Range, + Mode: pb.LockMode_Shared, + Policy: pb.WaitPolicy_Wait, + } + + // Create a Shared range lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, rangeRows, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table and directly test setModePairedRangeLock from range-end + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Test: Update range-end, verify range-start is also updated + lt.mu.Lock() + endLock, _ := lt.mu.store.Get(rangeEnd) + require.True(t, endLock.isLockRangeEnd(), "should be range-end") + require.True(t, endLock.isShared(), "should be Shared initially") + + // Simulate mode upgrade on range-end + updatedEnd, changed := endLock.setMode(pb.LockMode_Exclusive) + require.True(t, changed, "mode should change from Shared to Exclusive") + lt.mu.store.Add(rangeEnd, updatedEnd) + + // Call setModePairedRangeLock to update range-start (backward scan) + lt.setModePairedRangeLock(rangeEnd, updatedEnd, pb.LockMode_Exclusive) + + // Verify range-start is now Exclusive + startLock, _ := lt.mu.store.Get(rangeStart) + require.False(t, startLock.isShared(), + "range-start should be Exclusive after setModePairedRangeLock called on range-end") + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + +// TestSetModePairedRangeLockRowLockNoOp verifies that setModePairedRangeLock +// is a no-op for row locks. +func TestSetModePairedRangeLockRowLockNoOp(t *testing.T) { + runLockServiceTests( + t, + []string{"s1"}, + func(_ *lockTableAllocator, s []*service) { + l := s[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + tableID := uint64(10) + rowKey := []byte{3} + + sharedOpt := newTestRowSharedOptions() + + // Create a Shared row lock + txn1 := newTestTxnID(1) + _, err := l.Lock(ctx, tableID, [][]byte{rowKey}, txn1, sharedOpt) + require.NoError(t, err) + + // Get the lock table + v, err := l.getLockTable(0, tableID) + require.NoError(t, err) + lt := v.(*localLockTable) + + // Verify it's a row lock and call setModePairedRangeLock (should be no-op) + lt.mu.Lock() + rowLock, _ := lt.mu.store.Get(rowKey) + require.True(t, rowLock.isLockRow(), "should be row lock") + + // This should be a no-op (early return) + lt.setModePairedRangeLock(rowKey, rowLock, pb.LockMode_Exclusive) + lt.mu.Unlock() + + // Cleanup + require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{PhysicalTime: 1})) + }, + ) +} + // TestRangeLockWithInterleavedRowLocks verifies that setModePairedRangeLock // correctly scans past interleaved row locks to find the paired range entry. // Row locks outside the range can coexist with range locks in the btree. From e9bac46b5b29f1ee547c5fbe2e045e9a39458b90 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Thu, 5 Mar 2026 14:47:37 +0800 Subject: [PATCH 9/9] U --- pkg/sql/compile/ddl.go | 20 -------------------- pkg/txn/client/operator.go | 6 +----- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 8e1232d9b8214..44a748bd01b80 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -3994,26 +3994,6 @@ func getLockBatch(proc *process.Process, accountId uint32, names []string) (*bat return bat, nil } -var doLockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { - dbRel, err := getRelFromMoCatalog(c, catalog.MO_DATABASE) - if err != nil { - return err - } - accountID, err := defines.GetAccountId(c.proc.Ctx) - if err != nil { - return err - } - bat, err := getLockBatch(c.proc, accountID, []string{dbName}) - if err != nil { - return err - } - defer bat.GetVector(0).Free(c.proc.Mp()) - if err := lockRows(c.e, c.proc, dbRel, bat, 0, lockMode, lock.Sharding_None, accountID); err != nil { - return err - } - return nil -} - var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { dbRel, err := getRelFromMoCatalog(c, catalog.MO_DATABASE) if err != nil { diff --git a/pkg/txn/client/operator.go b/pkg/txn/client/operator.go index bcac966c7aab2..aaa0cbf2f97e8 100644 --- a/pkg/txn/client/operator.go +++ b/pkg/txn/client/operator.go @@ -1015,7 +1015,6 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c tc.logger.Fatal("can not write on ready only transaction") } var payload []txn.TxnRequest - var needUnlock bool if commit { if tc.reset.workspace != nil { reqs, err := tc.reset.workspace.Commit(ctx) @@ -1028,9 +1027,6 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c defer func() { tc.closeLocked(ctx) tc.mu.Unlock() - if needUnlock { - tc.unlock(ctx) - } }() if tc.mu.closed { tc.reset.commitErr = moerr.NewTxnClosedNoCtx(tc.reset.txnID) @@ -1039,7 +1035,7 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c if tc.needUnlockLocked() { tc.mu.txn.LockTables = tc.mu.lockTables - needUnlock = true + defer tc.unlock(ctx) } }