Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
361 commits
Select commit Hold shift + click to select a range
affc2c2
fix ut
jiangxinmeng1 Jan 13, 2026
ef81fbd
sca problems
jiangxinmeng1 Jan 13, 2026
29d625b
handle error
jiangxinmeng1 Jan 14, 2026
6f8aba4
resume pause
jiangxinmeng1 Jan 14, 2026
04a8531
update error handle
jiangxinmeng1 Jan 14, 2026
9944ff7
ignore snapshot already existed
jiangxinmeng1 Jan 14, 2026
a657e9b
add ut
jiangxinmeng1 Jan 14, 2026
be0728d
update
jiangxinmeng1 Jan 14, 2026
b6735be
add ut
jiangxinmeng1 Jan 14, 2026
2473a80
update
jiangxinmeng1 Jan 14, 2026
8f1c37d
update
jiangxinmeng1 Jan 14, 2026
85b2d8d
update
jiangxinmeng1 Jan 14, 2026
222dd47
test apply objects
jiangxinmeng1 Jan 14, 2026
d8bcf4c
add ut
jiangxinmeng1 Jan 15, 2026
c8805f9
update bvt
jiangxinmeng1 Jan 15, 2026
5fe17fa
fix
jiangxinmeng1 Jan 15, 2026
13592d8
Merge branch 'main' into publication
jiangxinmeng1 Jan 16, 2026
10fccf0
add state
jiangxinmeng1 Jan 19, 2026
10f0dda
update publication
jiangxinmeng1 Jan 19, 2026
0f2fb15
update bvt
jiangxinmeng1 Jan 19, 2026
07db3bb
fix sca problems
jiangxinmeng1 Jan 19, 2026
4fcb418
fix data race
jiangxinmeng1 Jan 19, 2026
69be131
Merge branch 'main' into publication
jiangxinmeng1 Jan 19, 2026
26e151f
fix sca problems
jiangxinmeng1 Jan 19, 2026
0a00ac6
fix
jiangxinmeng1 Jan 19, 2026
383f702
fix pause ccpr
jiangxinmeng1 Jan 19, 2026
0486906
update
jiangxinmeng1 Jan 19, 2026
2d55061
fix ut
jiangxinmeng1 Jan 19, 2026
1a79344
update sql executor
jiangxinmeng1 Jan 19, 2026
cd863f7
fix close iteration context
jiangxinmeng1 Jan 19, 2026
45e010f
fix ut
jiangxinmeng1 Jan 19, 2026
c9e5228
upgrade
jiangxinmeng1 Jan 19, 2026
11bfbcc
fix
jiangxinmeng1 Jan 19, 2026
437f32d
check cn uuid
jiangxinmeng1 Jan 19, 2026
8d7847e
fix ut
jiangxinmeng1 Jan 20, 2026
441c159
update show snapshots
jiangxinmeng1 Jan 20, 2026
73e2a06
update gc
jiangxinmeng1 Jan 20, 2026
8165b15
fix ut
jiangxinmeng1 Jan 20, 2026
6211caf
update gc
jiangxinmeng1 Jan 20, 2026
7312120
add ut
jiangxinmeng1 Jan 20, 2026
e34276f
add ut
jiangxinmeng1 Jan 20, 2026
abbc46b
get dropped db
jiangxinmeng1 Jan 20, 2026
b767bb6
fix index ddl
jiangxinmeng1 Jan 21, 2026
3787a66
add ut
jiangxinmeng1 Jan 21, 2026
0bf4fd8
rm code
jiangxinmeng1 Jan 21, 2026
d45e621
add ut
jiangxinmeng1 Jan 21, 2026
468313b
add ut
jiangxinmeng1 Jan 21, 2026
2ca386e
upstream helper
jiangxinmeng1 Jan 21, 2026
130d2e1
db to create
jiangxinmeng1 Jan 21, 2026
2faa156
add ut
jiangxinmeng1 Jan 21, 2026
f90a6bf
update retry
jiangxinmeng1 Jan 21, 2026
e2f3b59
add ut
jiangxinmeng1 Jan 21, 2026
218e4fc
sca problems
jiangxinmeng1 Jan 21, 2026
b87bdc3
add ut
jiangxinmeng1 Jan 22, 2026
5ac05e3
fix
jiangxinmeng1 Jan 22, 2026
e59cd00
fix
jiangxinmeng1 Jan 22, 2026
9a662c8
fix sca problems
jiangxinmeng1 Jan 22, 2026
e771e60
update ut
jiangxinmeng1 Jan 22, 2026
498d348
add ut
jiangxinmeng1 Jan 22, 2026
9db4b6e
update log
jiangxinmeng1 Jan 22, 2026
3d3782c
rm log
jiangxinmeng1 Jan 22, 2026
cfe2bbb
add ut
jiangxinmeng1 Jan 22, 2026
eae8e69
update ut
jiangxinmeng1 Jan 23, 2026
3f26154
add ut
jiangxinmeng1 Jan 23, 2026
8564ab6
rm code
jiangxinmeng1 Jan 23, 2026
36d8053
add ut
jiangxinmeng1 Jan 23, 2026
18d05d1
fix
jiangxinmeng1 Jan 23, 2026
073ff28
add ut
jiangxinmeng1 Jan 23, 2026
08f1d6a
add ut
jiangxinmeng1 Jan 23, 2026
1dec875
update ut
jiangxinmeng1 Jan 23, 2026
28edec5
add ut
jiangxinmeng1 Jan 23, 2026
21554e2
fix data race
jiangxinmeng1 Jan 23, 2026
dae267a
add ut
jiangxinmeng1 Jan 23, 2026
5fce7cc
add ut
jiangxinmeng1 Jan 23, 2026
019c653
add ut
jiangxinmeng1 Jan 23, 2026
3f55560
add ut
jiangxinmeng1 Jan 26, 2026
ef21236
fix ut
jiangxinmeng1 Jan 26, 2026
cd55d78
add ut
jiangxinmeng1 Jan 26, 2026
df3787c
fix
jiangxinmeng1 Jan 26, 2026
53eca7f
add ut
jiangxinmeng1 Jan 26, 2026
1f03d93
add ut
jiangxinmeng1 Jan 26, 2026
3a73ebf
add ut
jiangxinmeng1 Jan 26, 2026
d6cb756
skip view
jiangxinmeng1 Jan 27, 2026
c5f1888
fix
jiangxinmeng1 Jan 27, 2026
6693263
update log
jiangxinmeng1 Jan 27, 2026
5aa14e6
update worker
jiangxinmeng1 Jan 28, 2026
374d6f4
fix ut
jiangxinmeng1 Jan 28, 2026
d849cc8
update timeout
jiangxinmeng1 Jan 28, 2026
ffae706
fix context timeout
jiangxinmeng1 Jan 28, 2026
3638eb7
print worker stats
jiangxinmeng1 Jan 28, 2026
84edca6
update task_id type
jiangxinmeng1 Jan 29, 2026
7aa448f
update connect count
jiangxinmeng1 Jan 29, 2026
00cd0dc
add chunk buf pool
jiangxinmeng1 Jan 29, 2026
48a76d5
update config
jiangxinmeng1 Jan 30, 2026
74c3a52
update subscription sql
jiangxinmeng1 Jan 30, 2026
64c147a
update subscription sql
jiangxinmeng1 Jan 30, 2026
383fce4
update resume drop pause
jiangxinmeng1 Jan 30, 2026
4dd9c63
fix
jiangxinmeng1 Jan 30, 2026
04c1555
fix check snapshot flushed
jiangxinmeng1 Jan 30, 2026
84827b1
update create snapshots
jiangxinmeng1 Feb 1, 2026
fcae633
update get snapshot ts
jiangxinmeng1 Feb 1, 2026
86e3e8a
update
jiangxinmeng1 Feb 1, 2026
ce9c5da
fix
jiangxinmeng1 Feb 1, 2026
0d0987a
ccpr_objects
jiangxinmeng1 Feb 1, 2026
55b0b1f
update ut
jiangxinmeng1 Feb 1, 2026
64fad4d
update ut
jiangxinmeng1 Feb 1, 2026
e5e435a
update
jiangxinmeng1 Feb 1, 2026
c685554
add ccpr cache
jiangxinmeng1 Feb 2, 2026
ad53332
fix rollback
jiangxinmeng1 Feb 2, 2026
d9d6877
update aobject map
jiangxinmeng1 Feb 2, 2026
4b577b6
transfer deletes
jiangxinmeng1 Feb 2, 2026
42023a5
update ut
jiangxinmeng1 Feb 2, 2026
4ad17cf
fix ut
jiangxinmeng1 Feb 2, 2026
f9d034a
update write
jiangxinmeng1 Feb 2, 2026
0ce94b9
add worker
jiangxinmeng1 Feb 2, 2026
e414920
fix ut
jiangxinmeng1 Feb 2, 2026
849bd56
update write worker
jiangxinmeng1 Feb 2, 2026
3b869b9
update show ccpr subscription
jiangxinmeng1 Feb 2, 2026
2891499
add ut
jiangxinmeng1 Feb 3, 2026
12f507b
update sql
jiangxinmeng1 Feb 3, 2026
e37d533
update sql
jiangxinmeng1 Feb 3, 2026
1836267
ccpr_table/db
jiangxinmeng1 Feb 4, 2026
fe92723
reject dml
jiangxinmeng1 Feb 4, 2026
0d1a34a
feat(gc): add sync protection mechanism for cross-cluster sync
LeftHandCold Feb 4, 2026
ce0b8d6
update drop ccpr db/table
jiangxinmeng1 Feb 4, 2026
af77f95
update log
jiangxinmeng1 Feb 4, 2026
75ec851
remove mo-tool and add gc-tool
LeftHandCold Feb 4, 2026
c00be97
add bvt
LeftHandCold Feb 4, 2026
b6e3a24
U
LeftHandCold Feb 4, 2026
c7256b3
U
LeftHandCold Feb 4, 2026
0e4820a
create table where create ccpr task
jiangxinmeng1 Feb 4, 2026
2400268
update
LeftHandCold Feb 4, 2026
39530c0
U
LeftHandCold Feb 4, 2026
062615f
Merge branch 'main' into sync_protection
LeftHandCold Feb 4, 2026
0941fa3
U
LeftHandCold Feb 4, 2026
54104a4
U
LeftHandCold Feb 4, 2026
26f0bb4
U
LeftHandCold Feb 4, 2026
9cc05fc
U
LeftHandCold Feb 4, 2026
f9e4053
add watermark
jiangxinmeng1 Feb 4, 2026
e39e7eb
U
LeftHandCold Feb 4, 2026
a0d91c6
update snapshot
jiangxinmeng1 Feb 4, 2026
331fb4c
U
LeftHandCold Feb 4, 2026
dd94dbe
U
LeftHandCold Feb 4, 2026
01774ec
fix
jiangxinmeng1 Feb 4, 2026
1e01577
U
LeftHandCold Feb 4, 2026
0980968
fix
jiangxinmeng1 Feb 4, 2026
90b77ce
update show ccpr subscription
jiangxinmeng1 Feb 4, 2026
3adf478
U
LeftHandCold Feb 4, 2026
3d26b25
U
LeftHandCold Feb 4, 2026
799a9a3
stale read
jiangxinmeng1 Feb 4, 2026
549b9c0
U
LeftHandCold Feb 5, 2026
e72c0e1
update alter table
jiangxinmeng1 Feb 5, 2026
4f91e37
set ccpr task id in txn
jiangxinmeng1 Feb 5, 2026
d067f2c
update ut
jiangxinmeng1 Feb 5, 2026
7900e12
update ut
jiangxinmeng1 Feb 5, 2026
48edeeb
update sql executor
jiangxinmeng1 Feb 5, 2026
235e88b
fix ut
jiangxinmeng1 Feb 5, 2026
dadf31f
fix bvt
jiangxinmeng1 Feb 5, 2026
2e29656
fix sca problems
jiangxinmeng1 Feb 5, 2026
7e99c4e
gc protection
jiangxinmeng1 Feb 6, 2026
f6e1a8e
fix ut
jiangxinmeng1 Feb 6, 2026
21d073d
fix ut
jiangxinmeng1 Feb 6, 2026
5fbcc2a
update ccpr tables
jiangxinmeng1 Feb 6, 2026
47d064a
update ccpr tables
jiangxinmeng1 Feb 6, 2026
308742f
fix bvt
jiangxinmeng1 Feb 6, 2026
be2cc92
fix bvt
jiangxinmeng1 Feb 6, 2026
011d465
fix bvt
jiangxinmeng1 Feb 6, 2026
6eb78e1
fix duplication
jiangxinmeng1 Feb 6, 2026
c9e32c9
ccpr not trigger iscp
jiangxinmeng1 Feb 6, 2026
ae2dff4
fix ut
jiangxinmeng1 Feb 6, 2026
28e28ac
fix
jiangxinmeng1 Feb 9, 2026
dcd9368
fix merge
jiangxinmeng1 Feb 9, 2026
7c6307d
fix merge
jiangxinmeng1 Feb 9, 2026
aabba66
fix ccpr async tables
jiangxinmeng1 Feb 10, 2026
853348c
fix sca problems
jiangxinmeng1 Feb 10, 2026
a87a3d9
add ut
jiangxinmeng1 Feb 10, 2026
30a63ba
Merge branch 'main' into publication
jiangxinmeng1 Feb 10, 2026
11a5ddb
add ut
jiangxinmeng1 Feb 10, 2026
f9c65dc
fix bvt
jiangxinmeng1 Feb 11, 2026
972f51f
add ut
jiangxinmeng1 Feb 11, 2026
1bba4bd
add ut
jiangxinmeng1 Feb 11, 2026
b0b9dc3
remove code
jiangxinmeng1 Feb 11, 2026
69be8a7
add ut
jiangxinmeng1 Feb 11, 2026
98d4154
add ut
jiangxinmeng1 Feb 11, 2026
2280aac
fix ut
jiangxinmeng1 Feb 11, 2026
5676f1e
Merge branch 'main' into publication
jiangxinmeng1 Feb 12, 2026
1e6f063
update ccpr
jiangxinmeng1 Feb 13, 2026
8903270
revert gc protection
jiangxinmeng1 Feb 13, 2026
9570c7b
fix show subscriptions
jiangxinmeng1 Feb 13, 2026
d005ca1
fix alter table
jiangxinmeng1 Feb 24, 2026
8474015
fix index
jiangxinmeng1 Feb 24, 2026
c417461
fix
jiangxinmeng1 Feb 24, 2026
2249ad8
publicate account
jiangxinmeng1 Feb 25, 2026
2ca8455
fix account level
jiangxinmeng1 Feb 25, 2026
7f08364
fix
jiangxinmeng1 Feb 26, 2026
7a29061
fix
jiangxinmeng1 Feb 27, 2026
483c5e6
fix
jiangxinmeng1 Feb 27, 2026
d596d78
fix
jiangxinmeng1 Feb 27, 2026
88a2f6c
fix
jiangxinmeng1 Feb 27, 2026
55f7c9c
fix
jiangxinmeng1 Feb 28, 2026
57c2fc9
fix
jiangxinmeng1 Feb 28, 2026
da4d3d0
fix
jiangxinmeng1 Mar 2, 2026
9a689a9
add aobject tombstone rowoffset map
jiangxinmeng1 Mar 2, 2026
c6e2b35
fix update iteration state, not update context where error happens
jiangxinmeng1 Mar 2, 2026
72e6488
update get object
jiangxinmeng1 Mar 3, 2026
580b928
update
jiangxinmeng1 Mar 3, 2026
a75e165
sink nonappendable tombstone with ioutil.sinker
jiangxinmeng1 Mar 4, 2026
d067251
skip cn transfer
jiangxinmeng1 Mar 4, 2026
8bdd3d3
fix ccpr skip merge
jiangxinmeng1 Mar 5, 2026
aed2dce
update filter object
jiangxinmeng1 Mar 5, 2026
ce9ab8f
update config
jiangxinmeng1 Mar 5, 2026
2024fee
add matric
jiangxinmeng1 Mar 5, 2026
ae1d27c
update config
jiangxinmeng1 Mar 5, 2026
13d7a9b
update config
jiangxinmeng1 Mar 5, 2026
83321b1
memory control
jiangxinmeng1 Mar 6, 2026
704945c
update
jiangxinmeng1 Mar 6, 2026
350e548
object retry
jiangxinmeng1 Mar 9, 2026
12b19d9
fix
jiangxinmeng1 Mar 9, 2026
3e0904a
fix ut
jiangxinmeng1 Mar 11, 2026
1b5e22d
Merge origin/main into publication_2
jiangxinmeng1 Mar 11, 2026
ce0340a
Merge upstream/main into publication_2
jiangxinmeng1 Mar 11, 2026
c2d5724
fix sca problems
jiangxinmeng1 Mar 11, 2026
0f297d0
fix ut
jiangxinmeng1 Mar 11, 2026
b9bf2ab
fix ut
jiangxinmeng1 Mar 11, 2026
38c3a24
update
jiangxinmeng1 Mar 11, 2026
fce438f
update
jiangxinmeng1 Mar 11, 2026
bc28f12
fix ut
jiangxinmeng1 Mar 11, 2026
e21f714
Merge branch 'ccpr_gc' into publication_with_gc
jiangxinmeng1 Mar 11, 2026
f534ac5
add ut
jiangxinmeng1 Mar 11, 2026
abd0d43
Merge branch 'publication' into publication_with_gc
jiangxinmeng1 Mar 11, 2026
ca5c3eb
upadte publication_with_gc
jiangxinmeng1 Mar 12, 2026
b549617
update
jiangxinmeng1 Mar 12, 2026
94257d1
fix
jiangxinmeng1 Mar 12, 2026
16ae634
fix sql builder
jiangxinmeng1 Mar 12, 2026
582eea6
fix
jiangxinmeng1 Mar 12, 2026
15a8714
fix
jiangxinmeng1 Mar 12, 2026
8df868a
retry ttl timeout, fix check ttl
jiangxinmeng1 Mar 12, 2026
695fa65
fix check ttl
jiangxinmeng1 Mar 12, 2026
ccb6e53
update retry
jiangxinmeng1 Mar 12, 2026
e9d6ed2
fix unregister
jiangxinmeng1 Mar 12, 2026
a7e15fe
fix
jiangxinmeng1 Mar 12, 2026
cd76805
fix
jiangxinmeng1 Mar 12, 2026
3502cd5
fix ut
jiangxinmeng1 Mar 12, 2026
e5aa2ad
Merge branch 'main' into publication_with_gc
jiangxinmeng1 Mar 13, 2026
160febe
fix ut
jiangxinmeng1 Mar 13, 2026
127cf22
fix ut
jiangxinmeng1 Mar 13, 2026
f65cb38
fix scp problems
jiangxinmeng1 Mar 13, 2026
ae51e1b
add ut
jiangxinmeng1 Mar 13, 2026
cf9adf9
fix ut
jiangxinmeng1 Mar 13, 2026
a919c90
fix ut
jiangxinmeng1 Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.idea/
.kiro/
.cursor/
*.o
*.a
*.exe
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func init() {
sql = predefine.GenInitISCPTaskSQL()
initSQLs = append(initSQLs, sql)

sql = predefine.GenInitPublicationTaskSQL()
initSQLs = append(initSQLs, sql)

initSQLs = append(initSQLs, trace.InitSQLs...)

initSQLs = append(initSQLs, shardservice.InitSQLs...)
Expand Down
45 changes: 45 additions & 0 deletions pkg/bootstrap/versions/v4_0_0/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_iscp_log_new,
upg_mo_iscp_task,
upg_mo_publication_task,
upg_mo_ccpr_log_new,
upg_mo_ccpr_tables_new,
upg_mo_ccpr_dbs_new,
upg_mo_index_update_new,
upg_create_mo_branch_metadata,
upg_rename_system_stmt_info_4000,
Expand Down Expand Up @@ -60,6 +64,47 @@ var upg_mo_iscp_task = versions.UpgradeEntry{
},
}

var upg_mo_ccpr_log_new = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_CCPR_LOG,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoCcprLogDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_CCPR_LOG)
},
}

var upg_mo_ccpr_tables_new = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_CCPR_TABLES,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoCcprTablesDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_CCPR_TABLES)
},
}

var upg_mo_ccpr_dbs_new = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_CCPR_DBS,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoCcprDbsDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_CCPR_DBS)
},
}

var upg_mo_publication_task = versions.UpgradeEntry{
Schema: catalog.MOTaskDB,
TableName: catalog.MOSysDaemonTask,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: predefine.GenInitPublicationTaskSQL(),
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
ok, err := versions.CheckTableDataExist(txn, accountId, predefine.GenPublicationTaskCheckSQL())
return ok, err
},
}

var upg_mo_index_update_new = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_INDEX_UPDATE,
Expand Down
38 changes: 37 additions & 1 deletion pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

const (
// for schema
PropSchemaExtra = "schema_extra"
PropSchemaExtra = "schema_extra"
PropFromPublication = "from_publication"

Row_ID = objectio.PhysicalAddr_Attr
PrefixPriColName = "__mo_cpkey_"
Expand Down Expand Up @@ -174,11 +175,16 @@ const (
MO_ISCP_LOG = "mo_iscp_log"
MO_STORED_PROCEDURE = "mo_stored_procedure"

MO_CCPR_LOG = "mo_ccpr_log"

MO_INDEX_UPDATE = "mo_index_update"

MO_BRANCH_METADATA = "mo_branch_metadata"
MO_FEATURE_LIMIT = "mo_feature_limit"
MO_FEATURE_REGISTRY = "mo_feature_registry"

MO_CCPR_TABLES = "mo_ccpr_tables"
MO_CCPR_DBS = "mo_ccpr_dbs"
)

func IsSystemTable(id uint64) bool {
Expand Down Expand Up @@ -792,6 +798,36 @@ var (
types.New(types.T_uuid, 0, 0), // segment_id
types.New(types.T_TS, 0, 0), // flush_point
}

// mo_ccpr_tables schema: tableid (pk), taskid, dbname, tablename, account_id
MoCCPRTablesSchema = []string{
"tableid",
"taskid",
"dbname",
"tablename",
"account_id",
}
MoCCPRTablesTypes = []types.Type{
types.New(types.T_uint64, 0, 0), // tableid (primary key)
types.New(types.T_uuid, 0, 0), // taskid
types.New(types.T_varchar, 256, 0), // dbname
types.New(types.T_varchar, 256, 0), // tablename
types.New(types.T_uint32, 0, 0), // account_id
}

// mo_ccpr_dbs schema: dbid (pk), taskid, dbname, account_id
MoCCPRDbsSchema = []string{
"dbid",
"taskid",
"dbname",
"account_id",
}
MoCCPRDbsTypes = []types.Type{
types.New(types.T_uint64, 0, 0), // dbid (primary key)
types.New(types.T_uuid, 0, 0), // taskid
types.New(types.T_varchar, 256, 0), // dbname
types.New(types.T_uint32, 0, 0), // account_id
}
)

var (
Expand Down
12 changes: 12 additions & 0 deletions pkg/cnservice/server_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/task"
"github.com/matrixorigin/matrixone/pkg/proxy"
"github.com/matrixorigin/matrixone/pkg/publication"
moconnector "github.com/matrixorigin/matrixone/pkg/stream/connector"
"github.com/matrixorigin/matrixone/pkg/taskservice"
"github.com/matrixorigin/matrixone/pkg/util"
Expand Down Expand Up @@ -340,6 +341,17 @@ func (s *service) registerExecutorsLocked() {
),
)

s.task.runner.RegisterExecutor(task.TaskCode_PublicationExecutor,
publication.PublicationTaskExecutorFactory(
s.storeEngine,
s._txnClient,
s.task.runner.Attach,
s.cfg.UUID,
common.PublicationAllocator,
nil, // upstreamSQLHelperFactory can be nil for now
s.pu, // pass ParameterUnit from service
),
)
s.task.runner.RegisterExecutor(task.TaskCode_IndexUpdateTaskExecutor,
idxcron.IndexUpdateTaskExecutorFactory(
s.cfg.UUID,
Expand Down
38 changes: 38 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ const (
// ErrSchedulerClosed scheduler has been closed, cannot schedule new jobs
ErrSchedulerClosed uint16 = 20641

// GC sync protection errors
ErrGCIsRunning uint16 = 20642
ErrSyncProtectionNotFound uint16 = 20643
ErrSyncProtectionExists uint16 = 20644
ErrSyncProtectionMaxCount uint16 = 20645
ErrSyncProtectionSoftDelete uint16 = 20646
ErrSyncProtectionInvalid uint16 = 20647
ErrSyncProtectionExpired uint16 = 20648

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
ErrDeadLockDetected uint16 = 20701
Expand Down Expand Up @@ -323,6 +332,8 @@ const (

// Group 15: Vector Search
ErrVectorNeedRetryWithPreMode uint16 = 22301
// Group 16: CCPR
ErrCCPRReadOnly uint16 = 22401

// ErrEnd, the max value of MOErrorCode
ErrEnd uint16 = 65535
Expand Down Expand Up @@ -508,6 +519,15 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrOfflineTxnWrite: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "write offline txn: %s"},
ErrSchedulerClosed: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "scheduler closed"},

// GC sync protection errors
ErrGCIsRunning: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "GC is running, please retry later"},
ErrSyncProtectionNotFound: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection not found: %s"},
ErrSyncProtectionExists: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection already exists: %s"},
ErrSyncProtectionMaxCount: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection max count reached: %d"},
ErrSyncProtectionSoftDelete: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection is soft deleted: %s"},
ErrSyncProtectionInvalid: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "invalid sync protection request"},
ErrSyncProtectionExpired: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection expired: job %s validTS %d < prepareTS %d"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
ErrLockTableBindChanged: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table bind changed"},
Expand Down Expand Up @@ -573,6 +593,9 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
// Group 15: Vector Search
ErrVectorNeedRetryWithPreMode: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "vector search need retry with pre mode"},

// Group 16: CCPR
ErrCCPRReadOnly: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "ccpr shared object is read-only"},

// Group End: max value of MOErrorCode
ErrEnd: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "internal error: end of errcode code"},
}
Expand Down Expand Up @@ -1141,6 +1164,17 @@ func IsRPCClientClosed(err error) bool {
return IsMoErrCode(err, ErrClientClosed)
}

// IsSyncProtectionValidationError checks if error is any sync protection validation error.
// This allows CN to easily distinguish sync protection validation errors from other commit errors.
func IsSyncProtectionValidationError(err error) bool {
if err == nil {
return false
}
return IsMoErrCode(err, ErrSyncProtectionNotFound) ||
IsMoErrCode(err, ErrSyncProtectionSoftDelete) ||
IsMoErrCode(err, ErrSyncProtectionExpired)
}

func NewTxnClosed(ctx context.Context, txnID []byte) *Error {
id := "unknown"
if len(txnID) > 0 {
Expand Down Expand Up @@ -1690,6 +1724,10 @@ func NewErrTooBigPrecision(ctx context.Context, precision int32, funcName string
return newError(ctx, ErrTooBigPrecision, precision, funcName, maxPrecision)
}

func NewCCPRReadOnly(ctx context.Context) *Error {
return newError(ctx, ErrCCPRReadOnly)
}

var contextFunc atomic.Value

// noReportCtx is a cached context that suppresses error reporting.
Expand Down
29 changes: 29 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,32 @@ func NewSchedulerClosedNoCtx() *Error {
func NewVectorNeedRetryWithPreModeNoCtx() *Error {
return newError(Context(), ErrVectorNeedRetryWithPreMode)
}

// GC sync protection errors
func NewGCIsRunningNoCtx() *Error {
return newError(Context(), ErrGCIsRunning)
}

func NewSyncProtectionNotFoundNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionNotFound, jobID)
}

func NewSyncProtectionExistsNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionExists, jobID)
}

func NewSyncProtectionMaxCountNoCtx(maxCount int) *Error {
return newError(Context(), ErrSyncProtectionMaxCount, maxCount)
}

func NewSyncProtectionSoftDeleteNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionSoftDelete, jobID)
}

func NewSyncProtectionInvalidNoCtx() *Error {
return newError(Context(), ErrSyncProtectionInvalid)
}

func NewSyncProtectionExpiredNoCtx(jobID string, validTS, prepareTS int64) *Error {
return newError(Context(), ErrSyncProtectionExpired, jobID, validTS, prepareTS)
}
4 changes: 4 additions & 0 deletions pkg/defines/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ type IvfReaderParam struct{}
// PkCheckByTN whether TN does primary key uniqueness check against transaction's workspace or not.
type PkCheckByTN struct{}

// SkipTransferKey is used to indicate that the delete operation should skip transfer processing.
// Used by CCPR for cross-cluster tombstones.
type SkipTransferKey struct{}

// StartTS is the start timestamp of a statement.
type StartTS struct{}

Expand Down
Loading
Loading