diff --git a/client/benchmark.c b/client/benchmark.c index cb9c3ae..7da45d3 100644 --- a/client/benchmark.c +++ b/client/benchmark.c @@ -933,7 +933,8 @@ static void zc_acquire_cb(uint64_t rid, priskv_status status, void *result) zctx->pctx->job->mm->memcpy(zctx->value, (void *)region->addr, region->length); zctx->token = region->token; zctx->pctx->job->last_stage = "RELEASE"; - priskv_release_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_release_cb); + priskv_release_async(zctx->pctx->client, &zctx->token, false /* unpin_on_release */, (uint64_t)zctx, + zc_release_cb); } static void zc_seal_cb(uint64_t rid, priskv_status status, void *result) @@ -962,7 +963,8 @@ static void zc_alloc_cb(uint64_t rid, priskv_status status, void *result) zctx->pctx->job->mm->memcpy((void *)region->addr, zctx->value, copy_len); zctx->token = region->token; zctx->pctx->job->last_stage = "SEAL"; - priskv_seal_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_seal_cb); + priskv_seal_async(zctx->pctx->client, &zctx->token, false /* pin_on_seal */, (uint64_t)zctx, + zc_seal_cb); } /* ZeroCopy DROP callback is no longer used (published keys use DELETE semantics) */ @@ -994,8 +996,8 @@ static void priskv_drv_get(void *ctx, const char *key, void *value, uint32_t val zctx->value = value; zctx->value_len = value_len; priskv_ctx->job->last_stage = "ACQUIRE"; - priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, (uint64_t)zctx, - zc_acquire_cb); + priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false /* pin_on_acquire */, + (uint64_t)zctx, zc_acquire_cb); return; } /* Remove duplicate ZeroCopy GET branch */ diff --git a/client/client.c b/client/client.c index 1ef175c..628b8b2 100644 --- a/client/client.c +++ b/client/client.c @@ -32,6 +32,7 @@ #include #include #include +#include #include "priskv.h" #include "priskv-log.h" @@ -154,6 +155,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc) { char *key, *value, *opt, *opt_val, *str_end; uint64_t expire_time_ms = 0; + bool pin_on_seal = false; size_t valuelen; priskv_sgl sgl; priskv_status status; @@ -194,6 +196,9 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc) if (!strcmp(opt, "EX")) { expire_time_ms *= 1000; } + } else if (!strcmp(opt, "PIN")) { + /* Redis-style: allow 'PIN' to indicate pin-on-seal */ + pin_on_seal = true; } else { printf("%s\n", invalid_args_msg); return; @@ -217,7 +222,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc) printf("ALLOC_SET status(%d): %s, addr %p, length %u, token 0x%lx\n", status, priskv_status_str(status), (void *)region.addr, region.length, region.token); memcpy((void *)region.addr, value, (size_t)region.length); - status = priskv_seal(ctx->client, ®ion.token); + status = priskv_seal(ctx->client, ®ion.token, pin_on_seal); if (status != PRISKV_STATUS_OK) { printf("Failed to SEAL, status(%d): %s\n", status, priskv_status_str(status)); return; @@ -269,7 +274,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire) if (acquire) { priskv_memory_region region = {0}; printf("ACQUIRE key=%s\n", key); - status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, ®ion); + /* Do not pin on acquire by default from CLI */ + status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, ®ion); if (status != PRISKV_STATUS_OK) { printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status)); return; @@ -279,7 +285,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire) printf("ACQUIRE GET status(%d): %s\n", status, priskv_status_str(status)); printf("ACQUIRE GET value[%u]=%s\n", region.length, (char *)ctx->buf); - status = priskv_release(ctx->client, ®ion.token); + /* Do not unpin on release by default from CLI */ + status = priskv_release(ctx->client, ®ion.token, false); if (status != PRISKV_STATUS_OK) { printf("Failed to RELEASE, status(%d): %s\n", status, priskv_status_str(status)); return; @@ -384,6 +391,7 @@ static void seal_token_handler(client_context *ctx, char *args) char *tokstr, *str_end; uint64_t token = 0; priskv_status status; + bool pin_on_seal = false; tokstr = strtok_r(args, " ", &args); if (!tokstr) { @@ -400,7 +408,30 @@ static void seal_token_handler(client_context *ctx, char *args) return; } } - status = priskv_seal(ctx->client, &token); + /* Parse optional flags: 'PIN [TTL ]' */ + while (args && strlen(args) > 0) { + char *flag = strtok_r(args, " ", &args); + if (!strcmp(flag, "PIN")) { + pin_on_seal = true; + } else if (!strcmp(flag, "TTL")) { + /* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */ + char *ttl = strtok_r(args, " ", &args); + if (!ttl || !strlen(ttl)) { + printf("%s\n", invalid_args_msg); + return; + } + } else { + printf("%s\n", invalid_args_msg); + return; + } + } + + printf("SEAL token=%" PRIu64 " [PIN=%d]\n", token, pin_on_seal); + /* TODO(wangyi): Support per-command TTL for pin-on-seal (e.g., 'PIN TTL N') + * - Parse TTL value and pass through protocol once pin_ttl_ms is supported. + * - Default to server-side TTL when not provided. + */ + status = priskv_seal(ctx->client, &token, pin_on_seal); printf("SEAL status(%d): %s\n", status, priskv_status_str(status)); } @@ -409,6 +440,7 @@ static void acquire_only_handler(client_context *ctx, char *args) char *key; priskv_status status; priskv_memory_region region = {0}; + bool pin_on_acquire = false; key = strtok_r(args, " ", &args); if (!key) { @@ -416,8 +448,30 @@ static void acquire_only_handler(client_context *ctx, char *args) return; } - printf("ACQUIRE key=%s\n", key); - status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, ®ion); + /* Parse optional flags: 'PIN [TTL ]' */ + while (args && strlen(args) > 0) { + char *flag = strtok_r(args, " ", &args); + if (!strcmp(flag, "PIN")) { + pin_on_acquire = true; + } else if (!strcmp(flag, "TTL")) { + /* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */ + char *ttl = strtok_r(args, " ", &args); + if (!ttl || !strlen(ttl)) { + printf("%s\n", invalid_args_msg); + return; + } + } else { + printf("%s\n", invalid_args_msg); + return; + } + } + + /* Align output field name with CLI flag semantics */ + printf("ACQUIRE key=%s [PIN=%d]\n", key, pin_on_acquire); + /* TODO(wangyi): Support per-command TTL for pin-on-acquire (e.g., 'PIN TTL N') + * - Parse TTL value and pass through protocol once pin_ttl_ms is supported. + */ + status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, ®ion); printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status, priskv_status_str(status), (void *)region.addr, region.length, region.token); if (status == PRISKV_STATUS_OK) { @@ -434,6 +488,7 @@ static void release_token_handler(client_context *ctx, char *args) char *tokstr, *str_end; uint64_t token = 0; priskv_status status; + bool unpin_on_release = false; tokstr = strtok_r(args, " ", &args); if (!tokstr) { @@ -450,7 +505,22 @@ static void release_token_handler(client_context *ctx, char *args) return; } } - status = priskv_release(ctx->client, &token); + /* Parse optional flags: 'UNPIN' */ + while (args && strlen(args) > 0) { + char *flag = strtok_r(args, " ", &args); + if (!strcmp(flag, "UNPIN")) { + unpin_on_release = true; + } else { + printf("%s\n", invalid_args_msg); + return; + } + } + + printf("RELEASE token=%" PRIu64" [UNPIN=%d]\n", token, unpin_on_release); + /* TODO(wangyi): Diagnostics for UNPIN_NOT_CLOSED and TTL interactions + * - Consider printing hints when UNPIN_NOT_CLOSED occurs to aid debugging. + */ + status = priskv_release(ctx->client, &token, unpin_on_release); printf("RELEASE status(%d): %s\n", status, priskv_status_str(status)); } @@ -475,6 +545,7 @@ static void drop_token_handler(client_context *ctx, char *args) return; } } + printf("DROP token=% \n" PRIu64, token); status = priskv_drop(ctx->client, &token); printf("DROP status(%d): %s\n", status, priskv_status_str(status)); } @@ -649,14 +720,14 @@ static priskv_command commands[] = { {"set", set_handler, "set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"}, {"alloc_set", alloc_set_handler, - "alloc set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"}, + "alloc_set KEY VALUE [ EX seconds | PX milliseconds ] [ PIN ]\tzero-copy set with optional pin on seal\n"}, {"get", get_handler, "get KEY\t\t\t\t\t\tget key:value from priskv\n"}, {"acquire_get", acquire_get_handler, "acquire get KEY\t\t\t\t\t\tget key:value from priskv\n"}, {"alloc", alloc_only_handler, "alloc KEY BYTES [ EX seconds | PX milliseconds ]\t\tallocate region and print token\n"}, - {"seal", seal_token_handler, "seal TOKEN|last\t\t\t\t\tseal previously alloc'ed token\n"}, - {"acquire", acquire_only_handler, "acquire KEY\t\t\t\t\t\tacquire region and print token\n"}, - {"release", release_token_handler, "release TOKEN|last\t\t\t\t\trelease previously acquired token\n"}, + {"seal", seal_token_handler, "seal TOKEN|last [ PIN [ TTL milliseconds ] ]\t\tseal previously alloc'ed token\n"}, + {"acquire", acquire_only_handler, "acquire KEY [ PIN [ TTL milliseconds ] ]\t\tacquire region and print token\n"}, + {"release", release_token_handler, "release TOKEN|last [ UNPIN ]\t\trelease previously acquired token\n"}, {"drop", drop_token_handler, "drop TOKEN|last\t\t\t\t\tDrop unpublished ALLOC token\n"}, {"test", test_handler, "test KEY\t\t\t\t\t\ttest if the key exists in priskv\n"}, {"delete", delete_handler, "delete KEY\t\t\t\t\t\tdelete the key from priskv\n"}, diff --git a/client/priskv.h b/client/priskv.h index b29f578..21b4afe 100644 --- a/client/priskv.h +++ b/client/priskv.h @@ -33,6 +33,8 @@ extern "C" #endif #include +#include +#include "priskv-protocol.h" /* Include protocol request flag definitions */ typedef struct priskv_client priskv_client; typedef struct priskv_memory priskv_memory; @@ -105,6 +107,9 @@ typedef enum priskv_status { /* no such token */ PRISKV_STATUS_NO_SUCH_TOKEN, + /* unpin requested when pin_count == 0 on latest version */ + PRISKV_STATUS_UNPIN_NOT_CLOSED, + /* invalid SGL. the number of SGL within a command must not exceed @max_sgl */ PRISKV_STATUS_INVALID_SGL, @@ -169,6 +174,9 @@ static inline const char *priskv_status_str(priskv_status status) case PRISKV_STATUS_NO_SUCH_TOKEN: return "No such token"; + case PRISKV_STATUS_UNPIN_NOT_CLOSED: + return "Unpin operation not closed"; + case PRISKV_STATUS_INVALID_SGL: return "Invalid SGL"; @@ -262,16 +270,17 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le uint64_t timeout, uint64_t request_id, priskv_generic_cb cb); /* Seal memory region alloced for write (by token pointer, reuse key field) */ -int priskv_seal_async(priskv_client *client, const uint64_t *token, uint64_t request_id, - priskv_generic_cb cb); +int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal, + uint64_t request_id, priskv_generic_cb cb); /* Acquire memory region for zero copy read */ int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout, - uint64_t request_id, priskv_generic_cb cb); + bool pin_on_acquire, uint64_t request_id, priskv_generic_cb cb); /* Release memory region (by token pointer, reuse key field) */ -int priskv_release_async(priskv_client *client, const uint64_t *token, uint64_t request_id, - priskv_generic_cb cb); +int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release, + uint64_t request_id, priskv_generic_cb cb); + /* Drop memory region (by token pointer, reuse key field) */ int priskv_drop_async(priskv_client *client, const uint64_t *token, uint64_t request_id, @@ -325,11 +334,11 @@ uint64_t priskv_capacity(priskv_client *client); int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, uint64_t timeout, priskv_memory_region *region); -int priskv_seal(priskv_client *client, const uint64_t *token); +int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal); int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout, - priskv_memory_region *region); -int priskv_release(priskv_client *client, const uint64_t *token); + bool pin_on_acquire, priskv_memory_region *region); +int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release); int priskv_drop(priskv_client *client, const uint64_t *token); diff --git a/client/sync.c b/client/sync.c index b246450..1f8c3fe 100644 --- a/client/sync.c +++ b/client/sync.c @@ -177,20 +177,22 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, return req_sync.status; } -int priskv_seal(priskv_client *client, const uint64_t *token) +int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal) { priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false}; - priskv_seal_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb); + priskv_seal_async(client, token, pin_on_seal, (uint64_t)&req_sync, + priskv_common_sync_cb); priskv_sync_wait(client, &req_sync.done); return req_sync.status; } int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout, - priskv_memory_region *region) + bool pin_on_acquire, priskv_memory_region *region) { priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false}; - priskv_acquire_async(client, key, timeout, (uint64_t)&req_sync, priskv_zero_copy_req_sync_cb); + priskv_acquire_async(client, key, timeout, pin_on_acquire, (uint64_t)&req_sync, + priskv_zero_copy_req_sync_cb); priskv_sync_wait(client, &req_sync.done); if (req_sync.status == PRISKV_STATUS_OK && region) { @@ -201,10 +203,11 @@ int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout, return req_sync.status; } -int priskv_release(priskv_client *client, const uint64_t *token) +int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release) { priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false}; - priskv_release_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb); + priskv_release_async(client, token, unpin_on_release, (uint64_t)&req_sync, + priskv_common_sync_cb); priskv_sync_wait(client, &req_sync.done); return req_sync.status; diff --git a/client/transport/rdma.c b/client/transport/rdma.c index 9f4bffc..d13c2b9 100644 --- a/client/transport/rdma.c +++ b/client/transport/rdma.c @@ -1256,6 +1256,7 @@ static int priskv_rdma_req_send(void *arg) req->command = htobe16(rdma_req->cmd); req->nsgl = htobe16(rdma_req->nsgl); req->timeout = htobe64(rdma_req->timeout); + req->flags = htobe32(rdma_req->req_flags); req->key_length = htobe16(rdma_req->keylen); struct timeval client_metadata_send_time; diff --git a/client/transport/transport.c b/client/transport/transport.c index 29108e9..3238d37 100644 --- a/client/transport/transport.c +++ b/client/transport/transport.c @@ -287,7 +287,8 @@ static inline priskv_transport_conn *priskv_select_conn(priskv_client *client) static void priskv_send_command(priskv_client *client, uint64_t request_id, const char *key, uint32_t alloc_length, priskv_sgl *sgl, uint16_t nsgl, - uint64_t timeout, priskv_req_command cmd, priskv_generic_cb cb) + uint64_t timeout, priskv_req_command cmd, uint32_t req_flags, + priskv_generic_cb cb) { priskv_transport_conn *conn = priskv_select_conn(client); priskv_connect_param *param = &conn->param; @@ -330,7 +331,7 @@ static void priskv_send_command(priskv_client *client, uint64_t request_id, cons cb(request_id, PRISKV_STATUS_NO_MEM, NULL); return; } - + req->req_flags = req_flags; client->ops->submit_req(req); } @@ -343,7 +344,7 @@ int priskv_get_async(priskv_client *client, const char *key, priskv_sgl *sgl, ui } priskv_send_command(client, request_id, key, 0 /* alloc_length */, sgl, nsgl, 0, - PRISKV_COMMAND_GET, cb); + PRISKV_COMMAND_GET, 0, cb); return 0; } @@ -356,7 +357,7 @@ int priskv_set_async(priskv_client *client, const char *key, priskv_sgl *sgl, ui } priskv_send_command(client, request_id, key, 0 /* alloc_length */, sgl, nsgl, timeout, - PRISKV_COMMAND_SET, cb); + PRISKV_COMMAND_SET, 0, cb); return 0; } @@ -364,7 +365,7 @@ int priskv_test_async(priskv_client *client, const char *key, uint64_t request_i priskv_generic_cb cb) { priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_TEST, cb); + PRISKV_COMMAND_TEST, 0, cb); return 0; } @@ -372,7 +373,7 @@ int priskv_delete_async(priskv_client *client, const char *key, uint64_t request priskv_generic_cb cb) { priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_DELETE, cb); + PRISKV_COMMAND_DELETE, 0, cb); return 0; } @@ -380,7 +381,7 @@ int priskv_expire_async(priskv_client *client, const char *key, uint64_t timeout uint64_t request_id, priskv_generic_cb cb) { priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, timeout, - PRISKV_COMMAND_EXPIRE, cb); + PRISKV_COMMAND_EXPIRE, 0, cb); return 0; } @@ -388,7 +389,7 @@ int priskv_keys_async(priskv_client *client, const char *regex, uint64_t request priskv_generic_cb cb) { priskv_send_command(client, request_id, regex, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_KEYS, cb); + PRISKV_COMMAND_KEYS, 0, cb); return 0; } @@ -396,7 +397,7 @@ int priskv_nrkeys_async(priskv_client *client, const char *regex, uint64_t reque priskv_generic_cb cb) { priskv_send_command(client, request_id, regex, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_NRKEYS, cb); + PRISKV_COMMAND_NRKEYS, 0, cb); return 0; } @@ -404,7 +405,7 @@ int priskv_flush_async(priskv_client *client, const char *regex, uint64_t reques priskv_generic_cb cb) { priskv_send_command(client, request_id, regex, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_FLUSH, cb); + PRISKV_COMMAND_FLUSH, 0, cb); return 0; } @@ -412,39 +413,43 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le uint64_t timeout, uint64_t request_id, priskv_generic_cb cb) { priskv_send_command(client, request_id, key, alloc_length, NULL, 0, timeout, - PRISKV_COMMAND_ALLOC, cb); + PRISKV_COMMAND_ALLOC, 0, cb); return 0; } -int priskv_seal_async(priskv_client *client, const uint64_t *token, uint64_t request_id, - priskv_generic_cb cb) +int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal, + uint64_t request_id, priskv_generic_cb cb) { + uint32_t flags = pin_on_seal ? PRISKV_REQ_FLAG_PIN_ON_SEAL : 0; priskv_send_command(client, request_id, (const char *)token, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_SEAL, cb); + PRISKV_COMMAND_SEAL, flags, cb); return 0; } int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout, - uint64_t request_id, priskv_generic_cb cb) + bool pin_on_acquire, uint64_t request_id, priskv_generic_cb cb) { + uint32_t flags = pin_on_acquire ? PRISKV_REQ_FLAG_PIN_ON_ACQUIRE : 0; priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, timeout, - PRISKV_COMMAND_ACQUIRE, cb); + PRISKV_COMMAND_ACQUIRE, flags, cb); return 0; } -int priskv_release_async(priskv_client *client, const uint64_t *token, uint64_t request_id, - priskv_generic_cb cb) +int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release, + uint64_t request_id, priskv_generic_cb cb) { + uint32_t flags = unpin_on_release ? PRISKV_REQ_FLAG_UNPIN_ON_RELEASE : 0; priskv_send_command(client, request_id, (const char *)token, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_RELEASE, cb); + PRISKV_COMMAND_RELEASE, flags, cb); return 0; } + int priskv_drop_async(priskv_client *client, const uint64_t *token, uint64_t request_id, priskv_generic_cb cb) { priskv_send_command(client, request_id, (const char *)token, 0 /* alloc_length */, NULL, 0, 0, - PRISKV_COMMAND_DROP, cb); + PRISKV_COMMAND_DROP, 0, cb); return 0; } diff --git a/client/transport/transport.h b/client/transport/transport.h index ee0f911..9d72917 100644 --- a/client/transport/transport.h +++ b/client/transport/transport.h @@ -103,6 +103,7 @@ typedef struct priskv_transport_req { uint32_t alloc_length; uint64_t timeout; priskv_req_command cmd; + uint32_t req_flags; /* Serialize request behavior flags into priskv_request::flags */ void (*cb)(struct priskv_transport_req *req); priskv_generic_cb usercb; #define PRISKV_TRANSPORT_REQ_FLAG_SEND (1 << 0) diff --git a/client/transport/ucx.c b/client/transport/ucx.c index 27b08c8..b32d6cb 100644 --- a/client/transport/ucx.c +++ b/client/transport/ucx.c @@ -1363,6 +1363,7 @@ static int priskv_ucx_send_req(void *arg) req->command = htobe16(ucx_req->cmd); req->nsgl = htobe16(ucx_req->nsgl); req->timeout = htobe64(ucx_req->timeout); + req->flags = htobe32(ucx_req->req_flags); req->key_length = htobe16(ucx_req->keylen); req->alloc_length = htobe32(ucx_req->alloc_length); diff --git a/cluster/client/client.c b/cluster/client/client.c index c6acf60..c253474 100644 --- a/cluster/client/client.c +++ b/cluster/client/client.c @@ -592,15 +592,15 @@ int priskvClusterSubmitRequest(priskvClusterRequest *req) (uint64_t)req, priskvClusterZeroCopyRequestCallback); break; case SEAL: - priskv_seal_async(req->node->client, &req->token, (uint64_t)req, + priskv_seal_async(req->node->client, &req->token, false /* pin_on_seal */, (uint64_t)req, priskvClusterRequestCallback); break; case ACQUIRE: - priskv_acquire_async(req->node->client, req->key, req->timeout, (uint64_t)req, + priskv_acquire_async(req->node->client, req->key, req->timeout, false /* pin_on_acquire */, (uint64_t)req, priskvClusterZeroCopyRequestCallback); break; case RELEASE: - priskv_release_async(req->node->client, &req->token, (uint64_t)req, + priskv_release_async(req->node->client, &req->token, false /* unpin_on_release */, (uint64_t)req, priskvClusterRequestCallback); break; case DROP: @@ -1056,19 +1056,20 @@ int priskvClusterAllocRegion(priskvClusterClient *client, const char *key, uint3 } priskvClusterStatus priskvClusterSeal(priskvClusterClient *client, const char *key, - const uint64_t *token) + const uint64_t *token, bool pin_on_seal) { priskvClusterNode *node = priskvClusterGetNode(client, key); if (!node) { return PRISKV_CLUSTER_STATUS_NO_SUCH_KEY; } - priskv_status status = priskv_seal(node->client, token); + priskv_status status = priskv_seal(node->client, token, pin_on_seal); return priskvClusterStatusFromPriskvStatus(status); } priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char *key, - uint64_t timeout, uint64_t *addr, uint32_t *valuelen) + uint64_t timeout, bool pin_on_acquire, uint64_t *addr, + uint32_t *valuelen) { priskvClusterNode *node = priskvClusterGetNode(client, key); if (!node) { @@ -1076,7 +1077,7 @@ priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char } priskv_memory_region region = {0}; - priskv_status status = priskv_acquire(node->client, key, timeout, ®ion); + priskv_status status = priskv_acquire(node->client, key, timeout, pin_on_acquire, ®ion); if (status == PRISKV_STATUS_OK) { if (addr) { *addr = region.addr; @@ -1090,24 +1091,24 @@ priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char } int priskvClusterAcquireRegion(priskvClusterClient *client, const char *key, uint64_t timeout, - priskv_memory_region *region) + bool pin_on_acquire, priskv_memory_region *region) { priskvClusterNode *node = priskvClusterGetNode(client, key); if (!node) { return PRISKV_CLUSTER_STATUS_NO_SUCH_KEY; } - return priskv_acquire(node->client, key, timeout, region); + return priskv_acquire(node->client, key, timeout, pin_on_acquire, region); } priskvClusterStatus priskvClusterRelease(priskvClusterClient *client, const char *key, - const uint64_t *token) + const uint64_t *token, bool unpin_on_release) { priskvClusterNode *node = priskvClusterGetNode(client, key); if (!node) { return PRISKV_CLUSTER_STATUS_NO_SUCH_KEY; } - priskv_status status = priskv_release(node->client, token); + priskv_status status = priskv_release(node->client, token, unpin_on_release); return priskvClusterStatusFromPriskvStatus(status); } diff --git a/cluster/client/client.h b/cluster/client/client.h index 8456307..2d0bb6d 100644 --- a/cluster/client/client.h +++ b/cluster/client/client.h @@ -208,12 +208,12 @@ priskvClusterStatus priskvClusterSet(priskvClusterClient *client, const char *ke priskvClusterStatus priskvClusterAlloc(priskvClusterClient *client, const char *key, uint64_t alloc_length, uint64_t timeout, uint64_t *addr); priskvClusterStatus priskvClusterSeal(priskvClusterClient *client, const char *key, - const uint64_t *token); + const uint64_t *token, bool pin_on_seal); priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char *key, - uint64_t timeout, uint64_t *addr_offset, - uint32_t *valuelen); + uint64_t timeout, bool pin_on_acquire, + uint64_t *addr_offset, uint32_t *valuelen); priskvClusterStatus priskvClusterRelease(priskvClusterClient *client, const char *key, - const uint64_t *token); + const uint64_t *token, bool unpin_on_release); priskvClusterStatus priskvClusterDrop(priskvClusterClient *client, const char *key, const uint64_t *token); priskvClusterStatus priskvClusterTest(priskvClusterClient *client, const char *key, uint32_t *value_len); @@ -226,4 +226,4 @@ priskvClusterStatus priskvClusterStatusFromPriskvStatus(priskv_status status); int priskvClusterAllocRegion(priskvClusterClient *client, const char *key, uint32_t alloc_length, uint64_t timeout, priskv_memory_region *region); int priskvClusterAcquireRegion(priskvClusterClient *client, const char *key, uint64_t timeout, - priskv_memory_region *region); + bool pin_on_acquire, priskv_memory_region *region); diff --git a/include/priskv-protocol-helper.h b/include/priskv-protocol-helper.h index 622781c..4c25b2c 100644 --- a/include/priskv-protocol-helper.h +++ b/include/priskv-protocol-helper.h @@ -146,6 +146,9 @@ static inline const char *priskv_resp_status_str(priskv_resp_status status) case PRISKV_RESP_STATUS_NO_SUCH_TOKEN: return "No such token"; + case PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED: + return "Unpin operation not closed"; + case PRISKV_RESP_STATUS_INVALID_SGL: return "Invalid SGL"; diff --git a/include/priskv-protocol.h b/include/priskv-protocol.h index 8c75261..f1a1927 100644 --- a/include/priskv-protocol.h +++ b/include/priskv-protocol.h @@ -82,7 +82,7 @@ typedef enum priskv_req_command { PRISKV_COMMAND_SEAL = 0x09, /* seal memory region alloced for write */ PRISKV_COMMAND_ACQUIRE = 0x0a, /* acquire memory region if support zero copy */ PRISKV_COMMAND_RELEASE = 0x0b, /* release memory region acquired for read*/ - PRISKV_COMMAND_DROP = 0x0c, /* drop memory region and remove from hash table */ + PRISKV_COMMAND_DROP = 0x0c, /* drop memory region */ PRISKV_COMMAND_MAX /* not a part of protocol, keep last */ } priskv_req_command; @@ -103,11 +103,26 @@ typedef struct priskv_request_runtime { /* * request from client, submitted by @IBV_WR_SEND */ +/* Request flags (bitmask) to control optional behaviors */ +#define PRISKV_REQ_FLAG_PIN_ON_ACQUIRE (1u << 0) +#define PRISKV_REQ_FLAG_UNPIN_ON_RELEASE (1u << 1) +#define PRISKV_REQ_FLAG_PIN_ON_SEAL (1u << 2) + +/* + * TODO(wangyi): Consider protocol extensions for PinTTL override + * - Add optional fields to priskv_request (e.g., pin_ttl_ms) to allow per-request TTL + * configuration for pin operations. + * - Versioning: guard new fields behind a capability/version negotiation to preserve + * compatibility. + * - Transport integration: wire up pin_ttl_ms to PinManager on server side when present, + * otherwise fallback to server default TTL. + */ + typedef struct priskv_request { uint64_t request_id; uint64_t timeout; /* in ms */ uint16_t command; /* priskv_req_command */ - uint8_t reserved[4]; + uint32_t flags; /* request behavior flags (big-endian on wire) */ uint16_t nsgl; /* how many SGL contains following */ uint16_t key_length; uint32_t alloc_length; @@ -130,6 +145,8 @@ typedef enum priskv_resp_status { PRISKV_RESP_STATUS_NO_SUCH_KEY, /* token not found (for SEAL/RELEASE/DROP with invalid token) */ PRISKV_RESP_STATUS_NO_SUCH_TOKEN, + /* unpin requested when pin_count == 0 on latest version */ + PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED, PRISKV_RESP_STATUS_INVALID_SGL, PRISKV_RESP_STATUS_INVALID_REGEX, PRISKV_RESP_STATUS_KEY_UPDATING, diff --git a/include/priskv-ucx.h b/include/priskv-ucx.h index 222aebb..0adf046 100644 --- a/include/priskv-ucx.h +++ b/include/priskv-ucx.h @@ -140,7 +140,7 @@ typedef struct priskv_ucx_ep { break; \ case UCS_INPROGRESS: \ break; \ - default: \ + default:; \ const char *status_str = ucs_status_string(STATUS); \ priskv_log_error(MSG ", status: %s\n", status_str); \ CLEANUP; \ diff --git a/pypriskv/priskv/priskv_client.py b/pypriskv/priskv/priskv_client.py index 154d295..58b7ead 100644 --- a/pypriskv/priskv/priskv_client.py +++ b/pypriskv/priskv/priskv_client.py @@ -75,20 +75,23 @@ def alloc(self, def seal(self, key: str, - region: client.MemoryRegion) -> int: - return client.seal(self.conn, key, region) + region: client.MemoryRegion, + pin_on_seal: bool = False) -> int: + return client.seal(self.conn, key, region, pin_on_seal) def acquire(self, key: str, timeout: int = client.PRISKV_KEY_MAX_TIMEOUT, + pin_on_acquire: bool = False, ) -> Tuple[int, client.MemoryRegion]: - status, region = client.acquire(self.conn, key, timeout) + status, region = client.acquire(self.conn, key, timeout, pin_on_acquire) return status, region def release(self, key: str, - region: client.MemoryRegion) -> int: - return client.release(self.conn, key, region) + region: client.MemoryRegion, + unpin_on_release: bool = False) -> int: + return client.release(self.conn, key, region, unpin_on_release) def drop(self, key: str, diff --git a/pypriskv/pybind.cpp b/pypriskv/pybind.cpp index aef04e4..00a1068 100644 --- a/pypriskv/pybind.cpp +++ b/pypriskv/pybind.cpp @@ -139,13 +139,13 @@ std::tuple priskv_alloc_wrapper(uintptr_t client, std::string key } std::tuple priskv_acquire_wrapper(uintptr_t client, std::string key, - uint64_t timeout) + uint64_t timeout, bool pin_on_acquire) { uint64_t addr_offset = 0; uint32_t value_length = 0; int ret = priskvClusterAcquire((priskvClusterClient *)client, key.c_str(), timeout, - &addr_offset, &value_length); + pin_on_acquire, &addr_offset, &value_length); return {ret, addr_offset, value_length}; } @@ -276,7 +276,8 @@ PYBIND11_MODULE(_priskv_client, m) .value("PRISKV_STATUS_OK", PRISKV_STATUS_OK) .value("PRISKV_STATUS_NO_SUCH_KEY", PRISKV_STATUS_NO_SUCH_KEY) .value("PRISKV_STATUS_PERMISSION_DENIED", PRISKV_STATUS_PERMISSION_DENIED) - .value("PRISKV_STATUS_NO_SUCH_TOKEN", PRISKV_STATUS_NO_SUCH_TOKEN); + .value("PRISKV_STATUS_NO_SUCH_TOKEN", PRISKV_STATUS_NO_SUCH_TOKEN) + .value("PRISKV_STATUS_UNPIN_NOT_CLOSED", PRISKV_STATUS_UNPIN_NOT_CLOSED); pybind11::class_(m, "MemoryRegion", py::module_local()) .def(pybind11::init<>()) @@ -308,26 +309,29 @@ PYBIND11_MODULE(_priskv_client, m) m.def( "seal", - [](uintptr_t client, std::string key, const priskv_memory_region ®ion) { - return (int)priskvClusterSeal((priskvClusterClient *)client, key.c_str(), ®ion.token); + [](uintptr_t client, std::string key, const priskv_memory_region ®ion, bool pin_on_seal) { + return (int)priskvClusterSeal((priskvClusterClient *)client, key.c_str(), ®ion.token, pin_on_seal); }, + py::arg("client"), py::arg("key"), py::arg("region"), py::arg("pin_on_seal") = false, "A function to seal memory region of val."); m.def( "acquire", - [](uintptr_t client, std::string key, uint64_t timeout) { + [](uintptr_t client, std::string key, uint64_t timeout, bool pin_on_acquire) { priskv_memory_region region {0}; int ret = priskvClusterAcquireRegion((priskvClusterClient *)client, key.c_str(), - timeout, ®ion); + timeout, pin_on_acquire, ®ion); return py::make_tuple(ret, region); }, + py::arg("client"), py::arg("key"), py::arg("timeout"), py::arg("pin_on_acquire") = false, "A function to acquire memory region for read."); m.def( "release", - [](uintptr_t client, std::string key, const priskv_memory_region ®ion) { - return (int)priskvClusterRelease((priskvClusterClient *)client, key.c_str(), ®ion.token); + [](uintptr_t client, std::string key, const priskv_memory_region ®ion, bool unpin_on_release) { + return (int)priskvClusterRelease((priskvClusterClient *)client, key.c_str(), ®ion.token, unpin_on_release); }, + py::arg("client"), py::arg("key"), py::arg("region"), py::arg("unpin_on_release") = false, "A function to release memory region of read."); m.def( "drop", diff --git a/pypriskv/testing.py b/pypriskv/testing.py index f58016b..27d08f7 100644 --- a/pypriskv/testing.py +++ b/pypriskv/testing.py @@ -447,6 +447,101 @@ def test_transport_drop_behavior(self): print("[INFO] [TDROP] cleanup complete and key deleted") print("[INFO] [TDROP] Test end: Transport DROP semantics") + def test_pin_on_seal_and_inheritance(self): + """Validate pin_on_seal and multi-version inheritance (feature path coverage).""" + TEST_KEY = "py_pin_seal_key" + SIZE = 256 + TIMEOUT = 3000 + + # Version 1: alloc + seal(pin) + status, region1 = self.client.alloc(TEST_KEY, SIZE, TIMEOUT) + assert status == 0 and region1.length == SIZE + status = self.client.seal(TEST_KEY, region1, pin_on_seal=True) + assert status == 0 + + # Version 2: alloc + seal(pin) + status, region2 = self.client.alloc(TEST_KEY, SIZE // 2, TIMEOUT) + assert status == 0 and region2.length == SIZE // 2 + status = self.client.seal(TEST_KEY, region2, pin_on_seal=True) + assert status == 0 + + # Two rounds of acquire/release(unpin) + status, acq1 = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=False) + assert status == 0 + status = self.client.release(TEST_KEY, acq1, unpin_on_release=True) + assert status == priskv.PRISKV_STATUS.PRISKV_STATUS_OK + + status, acq2 = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=False) + assert status == 0 + status = self.client.release(TEST_KEY, acq2, unpin_on_release=True) + assert status == priskv.PRISKV_STATUS.PRISKV_STATUS_OK + + # Cleanup + assert self.client.delete(TEST_KEY) == 0 + + def test_pin_on_acquire_and_unpin_on_release(self): + """Validate acquire(pin_on_acquire) and release(unpin_on_release).""" + TEST_KEY = "py_pin_unpin_key" + SIZE = 128 + TIMEOUT = 3000 + + status, region = self.client.alloc(TEST_KEY, SIZE, TIMEOUT) + assert status == 0 + status = self.client.seal(TEST_KEY, region) + assert status == 0 + + status, acq = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=True) + assert status == 0 + status = self.client.release(TEST_KEY, acq, unpin_on_release=True) + assert status == priskv.PRISKV_STATUS.PRISKV_STATUS_OK + + assert self.client.delete(TEST_KEY) == 0 + + def test_unpin_not_closed_on_release(self): + """Unpin without prior pin should return UNPIN_NOT_CLOSED.""" + TEST_KEY = "py_unpin_not_closed" + SIZE = 64 + TIMEOUT = 3000 + + status, region = self.client.alloc(TEST_KEY, SIZE, TIMEOUT) + assert status == 0 + status = self.client.seal(TEST_KEY, region) + assert status == 0 + + status, acq = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=False) + assert status == 0 + status = self.client.release(TEST_KEY, acq, unpin_on_release=True) + assert status == priskv.PRISKV_STATUS.PRISKV_STATUS_UNPIN_NOT_CLOSED, \ + f"expected UNPIN_NOT_CLOSED, got {status}" + + assert self.client.delete(TEST_KEY) == 0 + + def test_unpin_no_such_key(self): + """When latest version does not exist, unpin returns NO_SUCH_KEY/NO_SUCH_TOKEN.""" + TEST_KEY = "py_unpin_no_such_key" + SIZE = 96 + TIMEOUT = 3000 + + status, region = self.client.alloc(TEST_KEY, SIZE, TIMEOUT) + assert status == 0 + status = self.client.seal(TEST_KEY, region) + assert status == 0 + + status, acq = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=True) + assert status == 0 + + # Release after delete + assert self.client.delete(TEST_KEY) == 0 + status = self.client.release(TEST_KEY, acq, unpin_on_release=True) + assert status in (priskv.PRISKV_STATUS.PRISKV_STATUS_NO_SUCH_TOKEN, + priskv.PRISKV_STATUS.PRISKV_STATUS_NO_SUCH_KEY) + + # TODO(wangyi): Add PinTTL expiry tests + # - Simulate pin-on-acquire/pin-on-seal with a short TTL and verify automatic cleanup + # decrements pin_count on the latest version after TTL. + # - Cover cases where latest is deleted/expired to ensure cleanup counters record + # NO_SUCH_KEY scenarios without crashing. + def run_testing(testing): testing.set() testing.get() @@ -464,6 +559,10 @@ def run_testing(testing): testing.test_memory_operations_full_flow() testing.test_transport_permissions() testing.test_transport_drop_behavior() + testing.test_pin_on_seal_and_inheritance() + testing.test_pin_on_acquire_and_unpin_on_release() + testing.test_unpin_not_closed_on_release() + testing.test_unpin_no_such_key() testing.cleanup() diff --git a/server/info.c b/server/info.c index 7ce662a..da51356 100644 --- a/server/info.c +++ b/server/info.c @@ -80,6 +80,20 @@ void priskv_info_get_kv(void *data) info->expire_routine_times = priskv_get_expire_routine_times(kv); info->expire_kv_count = priskv_get_expire_kv_count(kv); info->expire_kv_bytes = priskv_get_expire_kv_bytes(kv); + /* pin/unpin observability */ + info->pin_ops = priskv_get_pin_ops(kv); + info->unpin_ops = priskv_get_unpin_ops(kv); + info->unpin_not_closed = priskv_get_unpin_not_closed(kv); + + /* + * TODO(wangyi): PinTTL observability + * - Once PinTTL manager is introduced, expose additional counters here, e.g.: + * - pin_ttl_active: current number of active PinOperator entries + * - pin_ttl_expired: total number of expired PinOperator entries observed + * - pin_ttl_cleanup_ops: total number of successful cleanup-unpin operations + * - pin_ttl_orphaned: number of expiries that found NO_SUCH_KEY or mismatched state + * - Also consider exporting a configurable default TTL (kv->pin_ttl_ms) for introspection. + */ } void priskv_info_get_connection(void *data) diff --git a/server/jsonobjs.c b/server/jsonobjs.c index cb77fbd..68ee808 100644 --- a/server/jsonobjs.c +++ b/server/jsonobjs.c @@ -66,6 +66,12 @@ PRISKV_DECL_OBJECT_VALUE_FIELD(priskv_kv_info, "expire_kv_count", expire_kv_coun required, forced) PRISKV_DECL_OBJECT_VALUE_FIELD(priskv_kv_info, "expire_kv_bytes", expire_kv_bytes, priskv_uint64, required, forced) +PRISKV_DECL_OBJECT_VALUE_FIELD(priskv_kv_info, "pin_ops", pin_ops, priskv_uint64, required, + forced) +PRISKV_DECL_OBJECT_VALUE_FIELD(priskv_kv_info, "unpin_ops", unpin_ops, priskv_uint64, required, + forced) +PRISKV_DECL_OBJECT_VALUE_FIELD(priskv_kv_info, "unpin_not_closed", unpin_not_closed, priskv_uint64, + required, forced) PRISKV_DECL_OBJECT_END(priskv_kv_info, priskv_kv_info) diff --git a/server/jsonobjs.h b/server/jsonobjs.h index 1165575..44c8b30 100644 --- a/server/jsonobjs.h +++ b/server/jsonobjs.h @@ -63,6 +63,16 @@ typedef struct priskv_kv_info { uint64_t expire_routine_times; uint64_t expire_kv_count; uint64_t expire_kv_bytes; + /* pin/unpin observability */ + uint64_t pin_ops; + uint64_t unpin_ops; + uint64_t unpin_not_closed; + /* TODO(wangyi): Extend with PinTTL metrics once available + * - uint64_t pin_ttl_active; + * - uint64_t pin_ttl_expired; + * - uint64_t pin_ttl_cleanup_ops; + * - uint64_t pin_ttl_orphaned; + */ } priskv_kv_info; extern priskv_object priskv_kv_info_obj; diff --git a/server/kv.c b/server/kv.c index 1d4b8da..cf82436 100644 --- a/server/kv.c +++ b/server/kv.c @@ -99,8 +99,71 @@ typedef struct priskv_kv { uint32_t expire_routine_interval; /* interval to run expire routine */ priskv_expire_routine_statics expire_routine_statics; void *mf_ctx; + struct { + uint64_t pin_ops; + uint64_t unpin_ops; + uint64_t unpin_not_closed; + } pin_stats; } priskv_kv; +/* + * TODO(wangyi): Implement PinTTL cleanup mechanism + * + * Context: + * - Pin operations (PIN_ON_ACQUIRE, PIN_ON_SEAL) increase pin_count on the latest visible + * version to protect keys from eviction. If a consumer crashes or a request fails, some + * pin operations may never be closed by UNPIN (e.g., RELEASE with UNPIN_ON_RELEASE), leaving + * keys indefinitely pinned. + * + * Goals: + * - Introduce a best-effort TTL-based cleanup for orphaned pins so that keys are eventually + * unpinned when their associated requests are gone. + * - Maintain multi-version correctness: UNPIN targets the latest version even if the pin was + * created before a SEAL publish that replaced the visible version. + * + * Proposed design: + * - PinOperator: a lightweight record created on each effective pin, containing: + * - key (bytes + length) + * - creation timestamp (monotonic clock) + * - ttl_ms (configurable per pin or global default) + * - optional origin (ACQUIRE or SEAL) and a debug request_id for observability + * - PinManager: per-bucket or global manager storing PinOperator entries in an expiry-ordered + * min-heap or timing-wheel to enable O(logN) insert and efficient batch expiry checks. + * - On pin: + * - After pin_count++ on the targeted keynode, create and register a PinOperator. + * - On unpin: + * - Remove the corresponding PinOperator (match by key); then decrement pin_count on latest + * version using priskv_key_unpin_latest semantics. Multiple pins on the same key will + * have multiple PinOperator entries. + * - On seal (version migration): + * - pin_count is already inherited to the new version; PinOperator records keep referencing + * the key (not the keynode pointer), so no migration is required. + * - Scheduling: + * - Reuse expire routine infrastructure (timerfd) to periodically check PinManager and + * perform cleanup for expired entries. Each expired PinOperator triggers + * priskv_key_unpin_latest(kv, old_keynode_of_record) on the latest version by key. + * - Consider sharding the PinManager by hash-bucket index to minimize global contention. + * - Concurrency & locking: + * - PinOperator insert/remove should use per-bucket spinlocks consistent with hash-head + * protection, avoiding deadlocks by keeping lock order (manager lock -> keynode lock). + * - Configuration: + * - Provide a global default TTL (e.g., kv->pin_ttl_ms) and allow per-request override via + * request flags or auxiliary fields in the protocol header (future extension). + * - Observability & safeguards: + * - Export counters: pin_ttl_active, pin_ttl_expired, pin_ttl_cleanup_ops, pin_ttl_orphaned. + * - Cap the maximum number of active PinOperator entries to prevent memory blow-up; when + * exceeding the cap, log warnings and fallback to immediate unpin or refuse new pins. + * - Recovery: + * - PinTTL metadata is best-effort and may be non-persistent. After restart, keys might + * remain pinned by pin_count; scheduled cleanup resumes with new PinOperator records for + * future pins. Persistent logging can be considered if stronger guarantees are needed. + * + * Next steps: + * - Add PinManager data structures and lifecycle APIs. + * - Integrate with pin/unpin paths and expire routine scheduling. + * - Extend info endpoints to expose PinTTL metrics. + */ + static void priskv_lru_access(priskv_key *keynode, bool is_in_list) { priskv_kv *kv = keynode->kv; @@ -115,13 +178,24 @@ static void priskv_lru_access(priskv_key *keynode, bool is_in_list) static priskv_key *priskv_lru_evict(priskv_kv *kv) { - priskv_key *keynode = NULL; + priskv_key *candidate = NULL, *node; pthread_spin_lock(&kv->lru_lock); - keynode = list_tail(&kv->lru_head, priskv_key, lru_entry); + /* iterate from tail backwards to find an evictable node */ + list_for_each_rev(&kv->lru_head, node, lru_entry) + { + /* Check eviction eligibility: not pinned and no extra references */ + pthread_spin_lock(&node->lock); + bool evictable = (node->pin_count == 0 && node->refcnt == 1); + pthread_spin_unlock(&node->lock); + if (evictable) { + candidate = node; + break; + } + } pthread_spin_unlock(&kv->lru_lock); - return keynode; + return candidate; } static void priskv_lru_del_key(priskv_key *keynode) @@ -215,6 +289,9 @@ void *priskv_new_kv(uint8_t *key_base, uint8_t *value_base, int shm_fd, uint64_t assert(kv->value_base == priskv_buddy_base(kv->value_buddy)); kv->mf_ctx = mf_ctx; + kv->pin_stats.pin_ops = 0; + kv->pin_stats.unpin_ops = 0; + kv->pin_stats.unpin_not_closed = 0; priskv_log_notice("KV: max_key %d, max_key_length %d, value_block_size %d, value_blocks %ld\n", max_keys, max_key_length, value_block_size, value_blocks); @@ -520,6 +597,7 @@ int priskv_set_key(void *_kv, uint8_t *key, uint16_t keylen, uint8_t **val, uint keynode->valuelen = valuelen; memcpy(keynode->key, key, keylen); keynode->refcnt = 0; + keynode->pin_count = 0; pthread_spin_init(&keynode->lock, 0); priskv_keynode_ref(keynode); @@ -619,6 +697,7 @@ int priskv_alloc_node_private(void *_kv, uint8_t *key, uint16_t keylen, uint8_t keynode->expire_time.tv_usec = -1; } keynode->refcnt = 0; + keynode->pin_count = 0; pthread_spin_init(&keynode->lock, 0); priskv_keynode_ref(keynode); /* Initial reference held by the token */ @@ -656,6 +735,14 @@ int priskv_publish_node(void *_kv, void *_keynode) old_keynode = priskv_find_key(kv, (uint8_t *)keynode->key, keynode->keylen, PRISKV_KEY_MAX_TIMEOUT, true, NULL); if (old_keynode) { + /* inherit pin_count from old version to keep lifecycle semantics */ + pthread_spin_lock(&old_keynode->lock); + uint32_t old_pins = old_keynode->pin_count; + pthread_spin_unlock(&old_keynode->lock); + + pthread_spin_lock(&keynode->lock); + keynode->pin_count = old_pins; + pthread_spin_unlock(&keynode->lock); priskv_lru_del_key(old_keynode); __priskv_del_key(kv, old_keynode); } @@ -695,6 +782,85 @@ int priskv_drop_node(void *_kv, void *_keynode) return PRISKV_RESP_STATUS_OK; } +/* Increment pin_count on the given keynode. */ +int priskv_key_pin(void *_kv, void *_keynode) +{ + priskv_kv *kv = (priskv_kv *)_kv; + priskv_key *keynode = (priskv_key *)_keynode; + if (!keynode) { + return PRISKV_RESP_STATUS_SERVER_ERROR; + } + pthread_spin_lock(&keynode->lock); + keynode->pin_count++; + pthread_spin_unlock(&keynode->lock); + if (kv) { + kv->pin_stats.pin_ops++; + } + return PRISKV_RESP_STATUS_OK; +} + +/* Decrement pin_count on the latest version of the key corresponding to keynode. */ +int priskv_key_unpin_latest(void *_kv, void *_keynode) +{ + priskv_kv *kv = (priskv_kv *)_kv; + priskv_key *node = (priskv_key *)_keynode; + if (!kv || !node) { + return PRISKV_RESP_STATUS_SERVER_ERROR; + } + + uint8_t *key = node->key; + uint16_t keylen = node->keylen; + bool expired = false; + priskv_key *latest = priskv_find_key(kv, key, keylen, PRISKV_KEY_MAX_TIMEOUT, false, &expired); + if (!latest || expired) { + /* latest not found or expired already; nothing to unpin */ + if (kv) { + kv->pin_stats.unpin_ops++; + } + return PRISKV_RESP_STATUS_NO_SUCH_KEY; + } + + pthread_spin_lock(&latest->lock); + if (latest->pin_count > 0) { + latest->pin_count--; + } else { + /* indicate unpin is not closed (mismatched) */ + pthread_spin_unlock(&latest->lock); + priskv_keynode_deref(latest); + if (kv) { + kv->pin_stats.unpin_ops++; + kv->pin_stats.unpin_not_closed++; + } + return PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED; + } + pthread_spin_unlock(&latest->lock); + + /* release the temporary ref from priskv_find_key */ + priskv_keynode_deref(latest); + if (kv) { + kv->pin_stats.unpin_ops++; + } + return PRISKV_RESP_STATUS_OK; +} + +uint64_t priskv_get_pin_ops(void *_kv) +{ + priskv_kv *kv = (priskv_kv *)_kv; + return kv ? kv->pin_stats.pin_ops : 0; +} + +uint64_t priskv_get_unpin_ops(void *_kv) +{ + priskv_kv *kv = (priskv_kv *)_kv; + return kv ? kv->pin_stats.unpin_ops : 0; +} + +uint64_t priskv_get_unpin_not_closed(void *_kv) +{ + priskv_kv *kv = (priskv_kv *)_kv; + return kv ? kv->pin_stats.unpin_not_closed : 0; +} + int priskv_value_addr_offset(void *_kv, uint8_t *val, uint64_t *addr_offset) { priskv_kv *kv = _kv; diff --git a/server/kv.h b/server/kv.h index 7af3140..b9e8ee9 100644 --- a/server/kv.h +++ b/server/kv.h @@ -161,6 +161,29 @@ int priskv_alloc_node_private(void *_kv, uint8_t *key, uint16_t keylen, uint8_t int priskv_publish_node(void *_kv, void *_keynode); int priskv_drop_node(void *_kv, void *_keynode); +/* Pin/Unpin controls for lifecycle protection */ +int priskv_key_pin(void *_kv, void *_keynode); +int priskv_key_unpin_latest(void *_kv, void *_keynode); + +/* Pin/Unpin observability */ +uint64_t priskv_get_pin_ops(void *_kv); +uint64_t priskv_get_unpin_ops(void *_kv); +uint64_t priskv_get_unpin_not_closed(void *_kv); + +/* + * TODO(wangyi): PinManager APIs & metrics + * - Define PinManager lifecycle APIs: + * - priskv_pin_manager_init/_destroy + * - priskv_pin_register(key, keylen, ttl_ms, origin, request_id) + * - priskv_pin_remove(key, keylen) + * - priskv_pin_ttl_cleanup_tick() scheduled via timerfd + * - Define metrics getters for info panel: + * - priskv_get_pin_ttl_active() + * - priskv_get_pin_ttl_expired() + * - priskv_get_pin_ttl_cleanup_ops() + * - priskv_get_pin_ttl_orphaned() + */ + #if defined(__cplusplus) } #endif diff --git a/server/memory.h b/server/memory.h index f3d0d99..6581738 100644 --- a/server/memory.h +++ b/server/memory.h @@ -46,6 +46,11 @@ typedef struct priskv_key { struct timeval expire_time; pthread_spinlock_t lock; uint32_t refcnt; + uint32_t pin_count; /* number of active pins; protect from eviction */ + /* TODO(wangyi): Guard against pin_count overflow + * - Add warnings when pin_count approaches UINT32_MAX to prevent silent wrap-around. + * - Consider switching to 64-bit counters if workload can accumulate many long-lived pins. + */ bool inprocess; bool reserved[3]; uint16_t keylen; diff --git a/server/test/test_transport.c b/server/test/test_transport.c index 951209f..3817673 100644 --- a/server/test/test_transport.c +++ b/server/test/test_transport.c @@ -35,11 +35,13 @@ /* --- Transport Layer Permission Tests (Negative Cases) --- */ static priskv_resp_status last_status; +static uint64_t last_token; static int mock_send_response(priskv_transport_conn *conn, uint64_t request_id, priskv_resp_status status, uint32_t length, uint64_t addr_offset, uint64_t token) { last_status = status; + last_token = token; return 0; } @@ -76,6 +78,18 @@ static void do_mock_req(priskv_transport_conn *conn, uint16_t cmd, void *payload } } +static void do_mock_req_with_flags(priskv_transport_conn *conn, uint16_t cmd, void *payload, + uint16_t payload_len, uint32_t flags) +{ + uint8_t req_buf[1024]; + priskv_request *req = (priskv_request *)req_buf; + memset(req, 0, sizeof(req_buf)); + req->command = htobe16(cmd); + req->flags = htobe32(flags); + memcpy(mock_request_key(req), payload, payload_len); + priskv_transport_handle_recv(conn, req, sizeof(priskv_request) + payload_len); +} + static void test_kv_transport_permissions(void *kv) { priskv_transport_driver mock_driver = { @@ -377,6 +391,206 @@ static void test_kv_transport_alloc_token_add_fail(void *kv) g_transport_driver = old_driver; } +/* --- Pin on SEAL and pin_count inheritance tests --- */ +static void test_kv_transport_pin_on_seal(void *kv) +{ + priskv_transport_driver mock_driver = { + .name = "mock", + .send_response = mock_send_response, + .request_key_off = mock_request_key_off, + .request_key = mock_request_key, + .recv_req = mock_recv_req, + }; + priskv_transport_driver *old_driver = g_transport_driver; + g_transport_driver = &mock_driver; + + priskv_transport_conn conn = {0}; + conn.kv = kv; + conn.conn_cap.max_key_length = MAX_KEY_LENGTH; + conn.conn_cap.max_sgl = 8; + pthread_spin_init(&conn.lock, PTHREAD_PROCESS_PRIVATE); + + const char *key = "pin_seal_key"; + uint16_t keylen = (uint16_t)(strlen(key) + 1); + uint8_t *val_ptr = NULL; + void *keynode_alloc = NULL; + + /* ALLOC private node and publish with pin-on-seal */ + int s = priskv_alloc_node_private(kv, (uint8_t *)key, keylen, &val_ptr, 256, + PRISKV_KEY_MAX_TIMEOUT, &keynode_alloc); + assert(s == PRISKV_RESP_STATUS_OK && keynode_alloc); + uint64_t alloc_token = priskv_transport_token_add(&conn, keynode_alloc, PRISKV_TOKEN_TYPE_ALLOC); + uint64_t be_token = htobe64(alloc_token); + last_status = -1; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_SEAL, &be_token, sizeof(uint64_t), + PRISKV_REQ_FLAG_PIN_ON_SEAL); + assert(last_status == PRISKV_RESP_STATUS_OK); + + /* verify pin_count == 1 on latest */ + uint32_t vlen = 0; + void *keynode_acq = NULL; + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &keynode_acq); + assert(keynode_acq); + priskv_key *kn = (priskv_key *)keynode_acq; + assert(kn->pin_count == 1); + priskv_get_key_end(keynode_acq); + + /* publish a new version with pin-on-seal again; pin_count should inherit and increment to 2 */ + keynode_alloc = NULL; + s = priskv_alloc_node_private(kv, (uint8_t *)key, keylen, &val_ptr, 128, + PRISKV_KEY_MAX_TIMEOUT, &keynode_alloc); + assert(s == PRISKV_RESP_STATUS_OK && keynode_alloc); + alloc_token = priskv_transport_token_add(&conn, keynode_alloc, PRISKV_TOKEN_TYPE_ALLOC); + be_token = htobe64(alloc_token); + last_status = -1; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_SEAL, &be_token, sizeof(uint64_t), + PRISKV_REQ_FLAG_PIN_ON_SEAL); + assert(last_status == PRISKV_RESP_STATUS_OK); + + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &keynode_acq); + assert(keynode_acq); + kn = (priskv_key *)keynode_acq; + assert(kn->pin_count == 2); + priskv_get_key_end(keynode_acq); + + /* cleanup */ + priskv_delete_key(kv, (uint8_t *)key, keylen); + priskv_transport_token_cleanup(&conn); + pthread_spin_destroy(&conn.lock); + g_transport_driver = old_driver; +} + +/* --- Pin on ACQUIRE + Unpin on RELEASE tests --- */ +static void test_kv_transport_pin_and_unpin(void *kv) +{ + priskv_transport_driver mock_driver = { + .name = "mock", + .send_response = mock_send_response, + .request_key_off = mock_request_key_off, + .request_key = mock_request_key, + .recv_req = mock_recv_req, + }; + priskv_transport_driver *old_driver = g_transport_driver; + g_transport_driver = &mock_driver; + + priskv_transport_conn conn = {0}; + conn.kv = kv; + conn.conn_cap.max_key_length = MAX_KEY_LENGTH; + conn.conn_cap.max_sgl = 8; + pthread_spin_init(&conn.lock, PTHREAD_PROCESS_PRIVATE); + + const char *key = "pin_unpin_key"; + uint16_t keylen = (uint16_t)(strlen(key) + 1); + uint8_t *val_ptr = NULL; + void *keynode_alloc = NULL; + + /* publish a key */ + int s = priskv_alloc_node_private(kv, (uint8_t *)key, keylen, &val_ptr, 128, + PRISKV_KEY_MAX_TIMEOUT, &keynode_alloc); + assert(s == PRISKV_RESP_STATUS_OK && keynode_alloc); + priskv_publish_node(kv, keynode_alloc); + + /* ACQUIRE with pin-on-acquire */ + last_status = -1; last_token = 0; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_ACQUIRE, (void *)key, keylen, + PRISKV_REQ_FLAG_PIN_ON_ACQUIRE); + assert(last_status == PRISKV_RESP_STATUS_OK); + + /* verify pin_count == 1 */ + uint32_t vlen = 0; + void *keynode_acq = NULL; + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &keynode_acq); + assert(keynode_acq); + priskv_key *kn = (priskv_key *)keynode_acq; + assert(kn->pin_count == 1); + priskv_get_key_end(keynode_acq); + + /* RELEASE with unpin-on-release */ + uint64_t be_token = htobe64(last_token); + last_status = -1; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_RELEASE, &be_token, sizeof(uint64_t), + PRISKV_REQ_FLAG_UNPIN_ON_RELEASE); + assert(last_status == PRISKV_RESP_STATUS_OK); + + /* verify pin_count == 0 and counters updated */ + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &keynode_acq); + assert(keynode_acq); + kn = (priskv_key *)keynode_acq; + assert(kn->pin_count == 0); + priskv_get_key_end(keynode_acq); + + /* TODO(wangyi): Add PinTTL cleanup tests in transport layer + * - Inject short TTL for pin entries (once protocol supports it) and advance timer to + * verify automatic unpin on latest version. + * - Verify counters for ttl_expired and ttl_cleanup_ops. + */ + + uint64_t pin_ops = priskv_get_pin_ops(kv); + uint64_t unpin_ops = priskv_get_unpin_ops(kv); + uint64_t unpin_not_closed = priskv_get_unpin_not_closed(kv); + assert(pin_ops >= 1); + assert(unpin_ops >= 1); + assert(unpin_not_closed == 0); + + /* cleanup */ + priskv_delete_key(kv, (uint8_t *)key, keylen); + priskv_transport_token_cleanup(&conn); + pthread_spin_destroy(&conn.lock); + g_transport_driver = old_driver; +} + +/* --- Unpin when latest version is missing (deleted/expired) tests --- */ +static void test_kv_transport_unpin_no_such_key(void *kv) +{ + priskv_transport_driver mock_driver = { + .name = "mock", + .send_response = mock_send_response, + .request_key_off = mock_request_key_off, + .request_key = mock_request_key, + .recv_req = mock_recv_req, + }; + priskv_transport_driver *old_driver = g_transport_driver; + g_transport_driver = &mock_driver; + + priskv_transport_conn conn = {0}; + conn.kv = kv; + conn.conn_cap.max_key_length = MAX_KEY_LENGTH; + conn.conn_cap.max_sgl = 8; + pthread_spin_init(&conn.lock, PTHREAD_PROCESS_PRIVATE); + + const char *key = "unpin_nosuch_key"; + uint16_t keylen = (uint16_t)(strlen(key) + 1); + uint8_t *val_ptr = NULL; + void *keynode_alloc = NULL; + + /* publish a key */ + int s = priskv_alloc_node_private(kv, (uint8_t *)key, keylen, &val_ptr, 64, + PRISKV_KEY_MAX_TIMEOUT, &keynode_alloc); + assert(s == PRISKV_RESP_STATUS_OK && keynode_alloc); + priskv_publish_node(kv, keynode_alloc); + + /* ACQUIRE with pin-on-acquire to generate a token */ + last_status = -1; last_token = 0; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_ACQUIRE, (void *)key, keylen, + PRISKV_REQ_FLAG_PIN_ON_ACQUIRE); + assert(last_status == PRISKV_RESP_STATUS_OK); + + /* delete the key before release */ + priskv_delete_key(kv, (uint8_t *)key, keylen); + + /* RELEASE with unpin-on-release should return NO_SUCH_KEY */ + uint64_t be_token = htobe64(last_token); + last_status = -1; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_RELEASE, &be_token, sizeof(uint64_t), + PRISKV_REQ_FLAG_UNPIN_ON_RELEASE); + assert(last_status == PRISKV_RESP_STATUS_NO_SUCH_KEY); + + /* cleanup */ + priskv_transport_token_cleanup(&conn); + pthread_spin_destroy(&conn.lock); + g_transport_driver = old_driver; +} + int main() { uint8_t *key_base, *value_base; @@ -395,6 +609,9 @@ int main() test_kv_transport_drop_behavior(kv); test_kv_transport_param_validation(kv); test_kv_transport_alloc_token_add_fail(kv); + test_kv_transport_pin_on_seal(kv); + test_kv_transport_pin_and_unpin(kv); + test_kv_transport_unpin_no_such_key(kv); printf("TEST TRANSPORT: All tests passed!\n"); priskv_destroy_kv(kv); diff --git a/server/transport/transport.c b/server/transport/transport.c index f691567..18eabee 100644 --- a/server/transport/transport.c +++ b/server/transport/transport.c @@ -280,6 +280,7 @@ void priskv_transport_token_cleanup(priskv_transport_conn *conn) int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *req, uint32_t len) { uint16_t command = be16toh(req->command); + uint32_t flags = be32toh(req->flags); uint16_t nsgl = be16toh(req->nsgl); uint64_t timeout = be64toh(req->timeout); uint32_t alloc_length = be32toh(req->alloc_length); @@ -588,6 +589,15 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re break; } status = priskv_publish_node(conn->kv, keynode); + /* Optional: pin-on-seal if requested */ + if (status == PRISKV_RESP_STATUS_OK && (flags & PRISKV_REQ_FLAG_PIN_ON_SEAL)) { + (void)priskv_key_pin(conn->kv, keynode); + /* TODO(wangyi): PinTTL register on SEAL + * - If protocol provides per-request TTL (e.g., pin_ttl_ms), register a + * PinOperator with PinManager for this key to ensure eventual cleanup. + * - Fallback to server default TTL when not provided. + */ + } priskv_transport_token_del(conn, token); ret = driver->send_response(conn, req->request_id, status, 0, 0, 0); } @@ -608,6 +618,14 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re 0, 0, 0); break; } + /* Optional: pin on acquire if requested */ + if (flags & PRISKV_REQ_FLAG_PIN_ON_ACQUIRE) { + (void)priskv_key_pin(conn->kv, keynode); + /* TODO(wangyi): PinTTL register on ACQUIRE + * - Register PinOperator with PinManager using request-scoped or default TTL. + * - Associate optional request_id for observability. + */ + } status = priskv_value_addr_offset(conn->kv, val, &addr_offset); ret = driver->send_response(conn, req->request_id, status, valuelen, addr_offset, token); @@ -640,9 +658,18 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re PRISKV_RESP_STATUS_PERMISSION_DENIED, 0, 0, 0); break; } + /* Optional: unpin on release if requested; unpin always targets latest version */ + priskv_resp_status resp = PRISKV_RESP_STATUS_OK; + if (flags & PRISKV_REQ_FLAG_UNPIN_ON_RELEASE) { + /* TODO(wangyi): PinTTL remove on RELEASE + * - Remove the corresponding PinOperator entry for this key. + * - Then perform unpin on latest version to maintain multi-version semantics. + */ + resp = priskv_key_unpin_latest(conn->kv, keynode); + } priskv_get_key_end(keynode); priskv_transport_token_del(conn, token); - ret = driver->send_response(conn, req->request_id, PRISKV_RESP_STATUS_OK, 0, 0, 0); + ret = driver->send_response(conn, req->request_id, resp, 0, 0, 0); } break; case PRISKV_COMMAND_DROP: