Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
43c60b4
filter multiple tombstone/data, only keep last one
gouhongshen Jan 21, 2026
d357af5
fix: diff sql output and merge cases
gouhongshen Jan 22, 2026
c0333b0
update bvt case
gouhongshen Jan 22, 2026
920df16
improve apply performance, not done
gouhongshen Jan 23, 2026
161b593
refactor branch hashmap
gouhongshen Jan 29, 2026
b50a2ec
alloctor with throttler for data branch
gouhongshen Jan 30, 2026
cfa09c0
restrict the memory peak when handle deletes on lca
gouhongshen Feb 1, 2026
2e9d850
feat(data-branch): add diff output summary and strengthen diff tests
gouhongshen Feb 26, 2026
2193d22
test(bvt): add data branch diff output summary cases
gouhongshen Feb 26, 2026
3ae43b2
refactor(frontend): split data_branch into hashdiff/output/helpers/types
gouhongshen Feb 27, 2026
2a0767b
debug get commit ts
gouhongshen Feb 27, 2026
8687aab
commit ts ok
gouhongshen Mar 2, 2026
77627b1
rebabse main
gouhongshen Mar 2, 2026
42e7c27
chore(data-branch): clean up docs and revert unused disttae changes
gouhongshen Mar 2, 2026
b27b41a
chore: remove temporary AAAA debug logs
gouhongshen Mar 2, 2026
43c1028
Merge branch 'main' into fix/diff-insert-delete
gouhongshen Mar 2, 2026
e6260e7
test: raise coverage for data branch and hashmap paths
gouhongshen Mar 2, 2026
2488c23
test: improve data_branch_output coverage
gouhongshen Mar 3, 2026
e52c2a4
test: cover buildOutputSchema branches
gouhongshen Mar 5, 2026
ccb42d2
test: cover appender and cleanup file helpers
gouhongshen Mar 5, 2026
3f3602b
fix data branch diff branch timestamp semantics
gouhongshen Mar 9, 2026
b67e011
fix ineffectual lca branch assignments
gouhongshen Mar 9, 2026
89ab62c
format data branch dag lookup
gouhongshen Mar 10, 2026
87701cf
fix data branch csv diff result handling
gouhongshen Mar 10, 2026
f4d100f
Merge branch 'main' into fix/diff-insert-delete
gouhongshen Mar 12, 2026
db28c4d
Merge branch 'main' into fix/diff-insert-delete
mergify[bot] Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/common/rscthrottler/resource_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,19 @@ func AcquirePolicyForCNFlushS3(

return defaultAcquirePolicy(throttler, ask)
}

func AcquirePolicyForDataBranch(
throttler *memThrottler,
ask int64,
) (int64, bool) {

total := int64(throttler.actualTotalMemory.Load())
if total > 0 && throttler.limitRate > 0 {
used := throttler.rss.Load() + ask
if float64(used) > float64(total)*throttler.limitRate {
return 0, false
}
}

return defaultAcquirePolicy(throttler, ask)
}
39 changes: 39 additions & 0 deletions pkg/common/rscthrottler/resource_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,42 @@ func BenchmarkThrottler(b *testing.B) {
throttler.Available()
}
}

func TestAcquirePolicyForDataBranch(t *testing.T) {
t.Run("deny when projected usage exceeds rate limit", func(t *testing.T) {
throttler := &memThrottler{limitRate: 0.80}
throttler.actualTotalMemory.Store(100)
throttler.limit.Store(80)
throttler.rss.Store(70)
throttler.reserved.Store(5)

left, ok := AcquirePolicyForDataBranch(throttler, 11)
require.False(t, ok)
require.Equal(t, int64(0), left)
require.Equal(t, int64(5), throttler.reserved.Load())
})

t.Run("allow at boundary and reserve memory", func(t *testing.T) {
throttler := &memThrottler{limitRate: 0.80}
throttler.actualTotalMemory.Store(100)
throttler.limit.Store(80)
throttler.rss.Store(70)

left, ok := AcquirePolicyForDataBranch(throttler, 10)
require.True(t, ok)
require.Equal(t, int64(20), left)
require.Equal(t, int64(10), throttler.reserved.Load())
})

t.Run("fallback to default policy when rate check is disabled", func(t *testing.T) {
throttler := &memThrottler{limitRate: 0}
throttler.actualTotalMemory.Store(100)
throttler.limit.Store(80)
throttler.rss.Store(20)

left, ok := AcquirePolicyForDataBranch(throttler, 10)
require.True(t, ok)
require.Equal(t, int64(70), left)
require.Equal(t, int64(10), throttler.reserved.Load())
})
}
13 changes: 10 additions & 3 deletions pkg/frontend/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,18 @@ func updateBranchMetaTable(
return nil
}

if _, srcTblDef, err = ses.GetTxnCompileCtx().Resolve(
receipt.srcDb, receipt.srcTbl, receipt.snapshot,
); err != nil {
srcCtx := defines.AttachAccountId(ctx, receipt.srcAccount)
tcc := ses.GetTxnCompileCtx()
origCtx := tcc.GetContext()
tcc.SetContext(srcCtx)
defer tcc.SetContext(origCtx)

if _, srcTblDef, err = tcc.Resolve(receipt.srcDb, receipt.srcTbl, nil); err != nil {
return err
}
if srcTblDef == nil {
return moerr.NewNoSuchTable(srcCtx, receipt.srcDb, receipt.srcTbl)
}

dstCtx := defines.AttachAccountId(ctx, receipt.toAccount)

Expand Down
Loading
Loading