From d671a1a8a4a5ae4943ca6c1e9a77d05fe2108469 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Tue, 3 Mar 2026 18:25:43 +0800 Subject: [PATCH] revert refreshSnapshotAfterLock --- pkg/sql/compile/ddl.go | 60 ++++++++++++++++++---------------- pkg/sql/compile/ddl_test.go | 64 +++++++++++++++++-------------------- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index b8a67f2a645e8..2da67c13217bd 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -24,7 +24,6 @@ import ( "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" - moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime" commonutil "github.com/matrixorigin/matrixone/pkg/common/util" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -115,14 +114,39 @@ func (s *Scope) DropDatabase(c *Compile) error { return err } - // After acquiring the Exclusive lock, refresh the snapshot to the latest - // timestamp so that Relations() can see all tables committed before this - // point. This covers the edge case where a concurrent "data branch create - // table" transaction committed and released its Shared lock before DROP - // started locking (Mode B situation 1), in which case the lock service - // returns no conflict and doLock does not advance the snapshot. - if err = refreshSnapshotAfterLock(c); err != nil { - return err + // After acquiring the exclusive lock on mo_database, refresh the + // transaction's snapshot to the latest applied logtail timestamp. + // + // This fixes a race condition between concurrent CLONE (CREATE TABLE) + // and DROP DATABASE: + // 1. CLONE acquires Shared lock on mo_database, creates table in + // mo_tables, commits, releases Shared lock. + // 2. DROP acquires Exclusive lock on mo_database. The lock service + // runs hasNewVersionInRange on mo_database rows, but CLONE did + // NOT modify mo_database (only mo_tables), so changed=false and + // the snapshot is NOT advanced past CLONE's commit timestamp. + // 3. DROP calls Relations() with the stale snapshot, misses the + // newly created table, and drops the database without deleting + // the table — leaving an orphan record in mo_tables that causes + // an OkExpectedEOB panic during checkpoint replay. + // + // By explicitly advancing the snapshot to the latest commit timestamp + // after acquiring the exclusive lock, we ensure Relations() sees all + // tables committed before the lock was granted. + { + txnOp := c.proc.GetTxnOperator() + if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() { + latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS() + if txnOp.Txn().SnapshotTS.Less(latestCommitTS) { + newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS) + if err != nil { + return err + } + if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil { + return err + } + } + } } // handle sub @@ -3870,24 +3894,6 @@ var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) err return nil } -// refreshSnapshotAfterLock advances the transaction's snapshot to the current -// HLC timestamp. This ensures that subsequent reads (e.g. Relations()) see all -// data committed up to this moment, regardless of whether the lock service -// reported a conflict. -// -// This is necessary because the lock conflict detection only checks the locked -// table (mo_database), but concurrent transactions may have modified related -// tables (mo_tables) without changing mo_database. In such cases, doLock's -// hasNewVersionInRange returns false and the snapshot is not advanced. -var refreshSnapshotAfterLock = func(c *Compile) error { - now, _ := moruntime.ServiceRuntime(c.proc.GetService()).Clock().Now() - ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, now) - if err != nil { - return err - } - return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) -} - var lockMoTable = func( c *Compile, dbName string, diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index 27408b795fb4a..450025e1eaf05 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -792,11 +792,6 @@ func TestIsExperimentalEnabled(t *testing.T) { // mo_database. This prevents the race condition where a concurrent CLONE // (CREATE TABLE) commits between the snapshot and the lock acquisition, // leaving orphan records in mo_tables. -// -// The current implementation uses clock.Now() + WaitLogTailAppliedAt(now) to -// unconditionally advance the snapshot, rather than the v1 approach which used -// GetLatestCommitTS() with a conditional check (which failed due to defer LIFO -// ordering in doWrite causing unlock to execute before updateLastCommitTS). func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { dropDbDef := &plan2.DropDatabase{ IfExists: false, @@ -818,10 +813,12 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { TxnOffset: 0, } + snapshotTS := timestamp.Timestamp{PhysicalTime: 100} + latestCommitTS := timestamp.Timestamp{PhysicalTime: 200} appliedTS := timestamp.Timestamp{PhysicalTime: 200} - // Test: refreshSnapshotAfterLock is called and UpdateSnapshot is invoked. - t.Run("snapshot_always_refreshed_via_clock_now", func(t *testing.T) { + // Test 1: When snapshotTS < latestCommitTS, UpdateSnapshot MUST be called. + t.Run("snapshot_refreshed_when_stale", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -836,8 +833,9 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, + SnapshotTS: snapshotTS, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -851,6 +849,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() + txnCli.EXPECT().GetLatestCommitTS().Return(latestCommitTS).Times(1) + txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), latestCommitTS).Return(appliedTS, nil).Times(1) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp @@ -869,19 +869,6 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { }) defer lockMoDb.Reset() - // Stub refreshSnapshotAfterLock to simulate the clock.Now() path: - // it calls WaitLogTailAppliedAt and UpdateSnapshot unconditionally. - refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { - ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, appliedTS) - if err != nil { - return err - } - return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts) - }) - defer refreshStub.Reset() - - txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), appliedTS).Return(appliedTS, nil).Times(1) - c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) // The test will error at Relations(), but the key assertion is that @@ -889,8 +876,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { assert.Error(t, err) }) - // Test: refreshSnapshotAfterLock error is propagated. - t.Run("snapshot_refresh_error_propagated", func(t *testing.T) { + // Test 2: When snapshotTS >= latestCommitTS, UpdateSnapshot must NOT be called. + t.Run("snapshot_not_refreshed_when_fresh", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -900,13 +887,17 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { proc.Ctx = ctx proc.ReplaceTopCtx(ctx) + freshSnapshotTS := timestamp.Timestamp{PhysicalTime: 300} + staleCommitTS := timestamp.Timestamp{PhysicalTime: 200} + txnOp := mock_frontend.NewMockTxnOperator(ctrl) txnOp.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes() txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes() txnOp.EXPECT().Txn().Return(txn.TxnMeta{ - Mode: txn.TxnMode_Pessimistic, - Isolation: txn.TxnIsolation_RC, + Mode: txn.TxnMode_Pessimistic, + Isolation: txn.TxnIsolation_RC, + SnapshotTS: freshSnapshotTS, }).AnyTimes() txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes() txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes() @@ -915,29 +906,32 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) { txnOp.EXPECT().Snapshot().Return(txn.CNTxnSnapshot{}, nil).AnyTimes() txnOp.EXPECT().Status().Return(txn.TxnStatus_Active).AnyTimes() + // Key assertion: UpdateSnapshot must NOT be called. + txnOp.EXPECT().UpdateSnapshot(gomock.Any(), gomock.Any()).Times(0) + txnCli := mock_frontend.NewMockTxnClient(ctrl) txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes() + txnCli.EXPECT().GetLatestCommitTS().Return(staleCommitTS).Times(1) + // WaitLogTailAppliedAt should NOT be called either. + txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), gomock.Any()).Times(0) proc.Base.TxnClient = txnCli proc.Base.TxnOperator = txnOp + mockDb := mock_frontend.NewMockDatabase(ctrl) + mockDb.EXPECT().IsSubscription(gomock.Any()).Return(false).AnyTimes() + mockDb.EXPECT().Relations(gomock.Any()).Return(nil, moerr.NewInternalErrorNoCtx("stop here")).AnyTimes() + eng := mock_frontend.NewMockEngine(ctrl) - eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mock_frontend.NewMockDatabase(ctrl), nil).AnyTimes() + eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes() lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error { return nil }) defer lockMoDb.Reset() - // Stub refreshSnapshotAfterLock to return an error. - expectedErr := moerr.NewInternalErrorNoCtx("logtail wait failed") - refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error { - return expectedErr - }) - defer refreshStub.Reset() - c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now()) err := s.DropDatabase(c) - assert.ErrorIs(t, err, expectedErr) + assert.Error(t, err) }) }