Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime"
commonutil "github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -115,14 +114,39 @@ func (s *Scope) DropDatabase(c *Compile) error {
return err
}

// After acquiring the Exclusive lock, refresh the snapshot to the latest
// timestamp so that Relations() can see all tables committed before this
// point. This covers the edge case where a concurrent "data branch create
// table" transaction committed and released its Shared lock before DROP
// started locking (Mode B situation 1), in which case the lock service
// returns no conflict and doLock does not advance the snapshot.
if err = refreshSnapshotAfterLock(c); err != nil {
return err
// After acquiring the exclusive lock on mo_database, refresh the
// transaction's snapshot to the latest applied logtail timestamp.
//
// This fixes a race condition between concurrent CLONE (CREATE TABLE)
// and DROP DATABASE:
// 1. CLONE acquires Shared lock on mo_database, creates table in
// mo_tables, commits, releases Shared lock.
// 2. DROP acquires Exclusive lock on mo_database. The lock service
// runs hasNewVersionInRange on mo_database rows, but CLONE did
// NOT modify mo_database (only mo_tables), so changed=false and
// the snapshot is NOT advanced past CLONE's commit timestamp.
// 3. DROP calls Relations() with the stale snapshot, misses the
// newly created table, and drops the database without deleting
// the table — leaving an orphan record in mo_tables that causes
// an OkExpectedEOB panic during checkpoint replay.
//
// By explicitly advancing the snapshot to the latest commit timestamp
// after acquiring the exclusive lock, we ensure Relations() sees all
// tables committed before the lock was granted.
{
txnOp := c.proc.GetTxnOperator()
if txnOp.Txn().IsPessimistic() && txnOp.Txn().IsRCIsolation() {
latestCommitTS := c.proc.Base.TxnClient.GetLatestCommitTS()
if txnOp.Txn().SnapshotTS.Less(latestCommitTS) {
newTS, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, latestCommitTS)
if err != nil {
return err
}
if err := txnOp.UpdateSnapshot(c.proc.Ctx, newTS); err != nil {
return err
}
}
Comment on lines +137 to +148

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Getlatestcommitts race window 🐞 Bug ✓ Correctness

DropDatabase now conditionally refreshes snapshot based on TxnClient.GetLatestCommitTS(), but that
value can lag behind a just-committed txn because it’s updated only on ClosedEvent, which fires
after commit unlock. DROP can acquire the exclusive lock during this window and skip snapshot
refresh, reintroducing stale Relations() reads and orphaned catalog rows.
Agent Prompt
### Issue description
`DropDatabase` refreshes the transaction snapshot based on `TxnClient.GetLatestCommitTS()` and may skip the refresh when `latestCommitTS` is stale. `latestCommitTS` is updated via a `ClosedEvent` callback, but transaction commit unlock is deferred to run before `closeLocked()` triggers `ClosedEvent`, creating a window where a concurrent txn can commit+unlock and DROP can acquire the exclusive lock before `latestCommitTS` is advanced.

### Issue Context
This can reintroduce the stale-snapshot catalog race (DROP missing recently created tables and leaving orphan records).

### Fix Focus Areas
- Reintroduce/restore the previous approach that advances snapshot based on a monotonic current timestamp (HLC/clock) and `WaitLogTailAppliedAt(now)` after the exclusive lock is acquired.
- Keep the refresh scoped to the same txn modes intended (pessimistic + RC), but do not depend on `GetLatestCommitTS()` being up-to-date in the immediate post-commit window.

### Fix Focus Areas (code locations)
- pkg/sql/compile/ddl.go[117-150]
- (reference) pkg/txn/client/operator.go[998-1012]
- (reference) pkg/txn/client/client.go[327-333]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

}
}

// handle sub
Expand Down Expand Up @@ -3870,24 +3894,6 @@ var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) err
return nil
}

// refreshSnapshotAfterLock advances the transaction's snapshot to the current
// HLC timestamp. This ensures that subsequent reads (e.g. Relations()) see all
// data committed up to this moment, regardless of whether the lock service
// reported a conflict.
//
// This is necessary because the lock conflict detection only checks the locked
// table (mo_database), but concurrent transactions may have modified related
// tables (mo_tables) without changing mo_database. In such cases, doLock's
// hasNewVersionInRange returns false and the snapshot is not advanced.
var refreshSnapshotAfterLock = func(c *Compile) error {
now, _ := moruntime.ServiceRuntime(c.proc.GetService()).Clock().Now()
ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, now)
if err != nil {
return err
}
return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts)
}

var lockMoTable = func(
c *Compile,
dbName string,
Expand Down
64 changes: 29 additions & 35 deletions pkg/sql/compile/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,6 @@ func TestIsExperimentalEnabled(t *testing.T) {
// mo_database. This prevents the race condition where a concurrent CLONE
// (CREATE TABLE) commits between the snapshot and the lock acquisition,
// leaving orphan records in mo_tables.
//
// The current implementation uses clock.Now() + WaitLogTailAppliedAt(now) to
// unconditionally advance the snapshot, rather than the v1 approach which used
// GetLatestCommitTS() with a conditional check (which failed due to defer LIFO
// ordering in doWrite causing unlock to execute before updateLastCommitTS).
func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
dropDbDef := &plan2.DropDatabase{
IfExists: false,
Expand All @@ -818,10 +813,12 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
TxnOffset: 0,
}

snapshotTS := timestamp.Timestamp{PhysicalTime: 100}
latestCommitTS := timestamp.Timestamp{PhysicalTime: 200}
appliedTS := timestamp.Timestamp{PhysicalTime: 200}

// Test: refreshSnapshotAfterLock is called and UpdateSnapshot is invoked.
t.Run("snapshot_always_refreshed_via_clock_now", func(t *testing.T) {
// Test 1: When snapshotTS < latestCommitTS, UpdateSnapshot MUST be called.
t.Run("snapshot_refreshed_when_stale", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -836,8 +833,9 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes()
txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes()
txnOp.EXPECT().Txn().Return(txn.TxnMeta{
Mode: txn.TxnMode_Pessimistic,
Isolation: txn.TxnIsolation_RC,
Mode: txn.TxnMode_Pessimistic,
Isolation: txn.TxnIsolation_RC,
SnapshotTS: snapshotTS,
}).AnyTimes()
txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes()
txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes()
Expand All @@ -851,6 +849,8 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {

txnCli := mock_frontend.NewMockTxnClient(ctrl)
txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes()
txnCli.EXPECT().GetLatestCommitTS().Return(latestCommitTS).Times(1)
txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), latestCommitTS).Return(appliedTS, nil).Times(1)

proc.Base.TxnClient = txnCli
proc.Base.TxnOperator = txnOp
Expand All @@ -869,28 +869,15 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
})
defer lockMoDb.Reset()

// Stub refreshSnapshotAfterLock to simulate the clock.Now() path:
// it calls WaitLogTailAppliedAt and UpdateSnapshot unconditionally.
refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error {
ts, err := c.proc.Base.TxnClient.WaitLogTailAppliedAt(c.proc.Ctx, appliedTS)
if err != nil {
return err
}
return c.proc.GetTxnOperator().UpdateSnapshot(c.proc.Ctx, ts)
})
defer refreshStub.Reset()

txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), appliedTS).Return(appliedTS, nil).Times(1)

c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now())
err := s.DropDatabase(c)
// The test will error at Relations(), but the key assertion is that
// UpdateSnapshot was called (enforced by Times(1) on the mock).
assert.Error(t, err)
})

// Test: refreshSnapshotAfterLock error is propagated.
t.Run("snapshot_refresh_error_propagated", func(t *testing.T) {
// Test 2: When snapshotTS >= latestCommitTS, UpdateSnapshot must NOT be called.
t.Run("snapshot_not_refreshed_when_fresh", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -900,13 +887,17 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
proc.Ctx = ctx
proc.ReplaceTopCtx(ctx)

freshSnapshotTS := timestamp.Timestamp{PhysicalTime: 300}
staleCommitTS := timestamp.Timestamp{PhysicalTime: 200}

txnOp := mock_frontend.NewMockTxnOperator(ctrl)
txnOp.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes()
txnOp.EXPECT().Rollback(gomock.Any()).Return(nil).AnyTimes()
txnOp.EXPECT().GetWorkspace().Return(&Ws{}).AnyTimes()
txnOp.EXPECT().Txn().Return(txn.TxnMeta{
Mode: txn.TxnMode_Pessimistic,
Isolation: txn.TxnIsolation_RC,
Mode: txn.TxnMode_Pessimistic,
Isolation: txn.TxnIsolation_RC,
SnapshotTS: freshSnapshotTS,
}).AnyTimes()
txnOp.EXPECT().TxnOptions().Return(txn.TxnOptions{}).AnyTimes()
txnOp.EXPECT().NextSequence().Return(uint64(0)).AnyTimes()
Expand All @@ -915,29 +906,32 @@ func TestDropDatabase_SnapshotRefreshAfterExclusiveLock(t *testing.T) {
txnOp.EXPECT().Snapshot().Return(txn.CNTxnSnapshot{}, nil).AnyTimes()
txnOp.EXPECT().Status().Return(txn.TxnStatus_Active).AnyTimes()

// Key assertion: UpdateSnapshot must NOT be called.
txnOp.EXPECT().UpdateSnapshot(gomock.Any(), gomock.Any()).Times(0)

txnCli := mock_frontend.NewMockTxnClient(ctrl)
txnCli.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOp, nil).AnyTimes()
txnCli.EXPECT().GetLatestCommitTS().Return(staleCommitTS).Times(1)
// WaitLogTailAppliedAt should NOT be called either.
txnCli.EXPECT().WaitLogTailAppliedAt(gomock.Any(), gomock.Any()).Times(0)

proc.Base.TxnClient = txnCli
proc.Base.TxnOperator = txnOp

mockDb := mock_frontend.NewMockDatabase(ctrl)
mockDb.EXPECT().IsSubscription(gomock.Any()).Return(false).AnyTimes()
mockDb.EXPECT().Relations(gomock.Any()).Return(nil, moerr.NewInternalErrorNoCtx("stop here")).AnyTimes()

eng := mock_frontend.NewMockEngine(ctrl)
eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mock_frontend.NewMockDatabase(ctrl), nil).AnyTimes()
eng.EXPECT().Database(gomock.Any(), "test_db", gomock.Any()).Return(mockDb, nil).AnyTimes()

lockMoDb := gostub.Stub(&lockMoDatabase, func(_ *Compile, _ string, _ lock.LockMode) error {
return nil
})
defer lockMoDb.Reset()

// Stub refreshSnapshotAfterLock to return an error.
expectedErr := moerr.NewInternalErrorNoCtx("logtail wait failed")
refreshStub := gostub.Stub(&refreshSnapshotAfterLock, func(c *Compile) error {
return expectedErr
})
defer refreshStub.Reset()

c := NewCompile("test", "test", "drop database test_db", "", "", eng, proc, nil, false, nil, time.Now())
err := s.DropDatabase(c)
assert.ErrorIs(t, err, expectedErr)
assert.Error(t, err)
})
}
Loading