Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions client/benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@ static void zc_acquire_cb(uint64_t rid, priskv_status status, void *result)
zctx->pctx->job->mm->memcpy(zctx->value, (void *)region->addr, region->length);
zctx->token = region->token;
zctx->pctx->job->last_stage = "RELEASE";
priskv_release_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_release_cb);
priskv_release_async(zctx->pctx->client, &zctx->token, false /* unpin_on_release */, (uint64_t)zctx,
zc_release_cb);
}

static void zc_seal_cb(uint64_t rid, priskv_status status, void *result)
Expand Down Expand Up @@ -962,7 +963,8 @@ static void zc_alloc_cb(uint64_t rid, priskv_status status, void *result)
zctx->pctx->job->mm->memcpy((void *)region->addr, zctx->value, copy_len);
zctx->token = region->token;
zctx->pctx->job->last_stage = "SEAL";
priskv_seal_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_seal_cb);
priskv_seal_async(zctx->pctx->client, &zctx->token, false /* pin_on_seal */, (uint64_t)zctx,
zc_seal_cb);
}

/* ZeroCopy DROP callback is no longer used (published keys use DELETE semantics) */
Expand Down Expand Up @@ -994,8 +996,8 @@ static void priskv_drv_get(void *ctx, const char *key, void *value, uint32_t val
zctx->value = value;
zctx->value_len = value_len;
priskv_ctx->job->last_stage = "ACQUIRE";
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, (uint64_t)zctx,
zc_acquire_cb);
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false /* pin_on_acquire */,
(uint64_t)zctx, zc_acquire_cb);
return;
}
/* Remove duplicate ZeroCopy GET branch */
Expand Down
93 changes: 82 additions & 11 deletions client/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <inttypes.h>

#include "priskv.h"
#include "priskv-log.h"
Expand Down Expand Up @@ -154,6 +155,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
{
char *key, *value, *opt, *opt_val, *str_end;
uint64_t expire_time_ms = 0;
bool pin_on_seal = false;
size_t valuelen;
priskv_sgl sgl;
priskv_status status;
Expand Down Expand Up @@ -194,6 +196,9 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
if (!strcmp(opt, "EX")) {
expire_time_ms *= 1000;
}
} else if (!strcmp(opt, "PIN")) {
/* Redis-style: allow 'PIN' to indicate pin-on-seal */
pin_on_seal = true;
} else {
printf("%s\n", invalid_args_msg);
return;
Expand All @@ -217,7 +222,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
printf("ALLOC_SET status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
priskv_status_str(status), (void *)region.addr, region.length, region.token);
memcpy((void *)region.addr, value, (size_t)region.length);
status = priskv_seal(ctx->client, &region.token);
status = priskv_seal(ctx->client, &region.token, pin_on_seal);
if (status != PRISKV_STATUS_OK) {
printf("Failed to SEAL, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand Down Expand Up @@ -269,7 +274,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
if (acquire) {
priskv_memory_region region = {0};
printf("ACQUIRE key=%s\n", key);
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, &region);
/* Do not pin on acquire by default from CLI */
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, &region);
if (status != PRISKV_STATUS_OK) {
printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand All @@ -279,7 +285,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
printf("ACQUIRE GET status(%d): %s\n", status, priskv_status_str(status));
printf("ACQUIRE GET value[%u]=%s\n", region.length, (char *)ctx->buf);

status = priskv_release(ctx->client, &region.token);
/* Do not unpin on release by default from CLI */
status = priskv_release(ctx->client, &region.token, false);
if (status != PRISKV_STATUS_OK) {
printf("Failed to RELEASE, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand Down Expand Up @@ -384,6 +391,7 @@ static void seal_token_handler(client_context *ctx, char *args)
char *tokstr, *str_end;
uint64_t token = 0;
priskv_status status;
bool pin_on_seal = false;

tokstr = strtok_r(args, " ", &args);
if (!tokstr) {
Expand All @@ -400,7 +408,30 @@ static void seal_token_handler(client_context *ctx, char *args)
return;
}
}
status = priskv_seal(ctx->client, &token);
/* Parse optional flags: 'PIN [TTL <ms>]' */
while (args && strlen(args) > 0) {
char *flag = strtok_r(args, " ", &args);
if (!strcmp(flag, "PIN")) {
pin_on_seal = true;
} else if (!strcmp(flag, "TTL")) {
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
char *ttl = strtok_r(args, " ", &args);
if (!ttl || !strlen(ttl)) {
printf("%s\n", invalid_args_msg);
return;
}
} else {
printf("%s\n", invalid_args_msg);
return;
}
}

printf("SEAL token=%" PRIu64 " [PIN=%d]\n", token, pin_on_seal);
/* TODO(wangyi): Support per-command TTL for pin-on-seal (e.g., 'PIN TTL N')
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
* - Default to server-side TTL when not provided.
*/
status = priskv_seal(ctx->client, &token, pin_on_seal);
printf("SEAL status(%d): %s\n", status, priskv_status_str(status));
}

Expand All @@ -409,15 +440,38 @@ static void acquire_only_handler(client_context *ctx, char *args)
char *key;
priskv_status status;
priskv_memory_region region = {0};
bool pin_on_acquire = false;

key = strtok_r(args, " ", &args);
if (!key) {
printf("%s\n", invalid_args_msg);
return;
}

printf("ACQUIRE key=%s\n", key);
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, &region);
/* Parse optional flags: 'PIN [TTL <ms>]' */
while (args && strlen(args) > 0) {
char *flag = strtok_r(args, " ", &args);
if (!strcmp(flag, "PIN")) {
pin_on_acquire = true;
} else if (!strcmp(flag, "TTL")) {
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
char *ttl = strtok_r(args, " ", &args);
if (!ttl || !strlen(ttl)) {
printf("%s\n", invalid_args_msg);
return;
}
} else {
printf("%s\n", invalid_args_msg);
return;
}
}

/* Align output field name with CLI flag semantics */
printf("ACQUIRE key=%s [PIN=%d]\n", key, pin_on_acquire);
/* TODO(wangyi): Support per-command TTL for pin-on-acquire (e.g., 'PIN TTL N')
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
*/
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, &region);
printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
priskv_status_str(status), (void *)region.addr, region.length, region.token);
if (status == PRISKV_STATUS_OK) {
Expand All @@ -434,6 +488,7 @@ static void release_token_handler(client_context *ctx, char *args)
char *tokstr, *str_end;
uint64_t token = 0;
priskv_status status;
bool unpin_on_release = false;

tokstr = strtok_r(args, " ", &args);
if (!tokstr) {
Expand All @@ -450,7 +505,22 @@ static void release_token_handler(client_context *ctx, char *args)
return;
}
}
status = priskv_release(ctx->client, &token);
/* Parse optional flags: 'UNPIN' */
while (args && strlen(args) > 0) {
char *flag = strtok_r(args, " ", &args);
if (!strcmp(flag, "UNPIN")) {
unpin_on_release = true;
} else {
printf("%s\n", invalid_args_msg);
return;
}
}

printf("RELEASE token=%" PRIu64" [UNPIN=%d]\n", token, unpin_on_release);
/* TODO(wangyi): Diagnostics for UNPIN_NOT_CLOSED and TTL interactions
* - Consider printing hints when UNPIN_NOT_CLOSED occurs to aid debugging.
*/
status = priskv_release(ctx->client, &token, unpin_on_release);
printf("RELEASE status(%d): %s\n", status, priskv_status_str(status));
}

Expand All @@ -475,6 +545,7 @@ static void drop_token_handler(client_context *ctx, char *args)
return;
}
}
printf("DROP token=% \n" PRIu64, token);
status = priskv_drop(ctx->client, &token);
printf("DROP status(%d): %s\n", status, priskv_status_str(status));
}
Expand Down Expand Up @@ -649,14 +720,14 @@ static priskv_command commands[] = {
{"set", set_handler,
"set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"},
{"alloc_set", alloc_set_handler,
"alloc set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"},
"alloc_set KEY VALUE [ EX seconds | PX milliseconds ] [ PIN ]\tzero-copy set with optional pin on seal\n"},
{"get", get_handler, "get KEY\t\t\t\t\t\tget key:value from priskv\n"},
{"acquire_get", acquire_get_handler, "acquire get KEY\t\t\t\t\t\tget key:value from priskv\n"},
{"alloc", alloc_only_handler,
"alloc KEY BYTES [ EX seconds | PX milliseconds ]\t\tallocate region and print token\n"},
{"seal", seal_token_handler, "seal TOKEN|last\t\t\t\t\tseal previously alloc'ed token\n"},
{"acquire", acquire_only_handler, "acquire KEY\t\t\t\t\t\tacquire region and print token\n"},
{"release", release_token_handler, "release TOKEN|last\t\t\t\t\trelease previously acquired token\n"},
{"seal", seal_token_handler, "seal TOKEN|last [ PIN [ TTL milliseconds ] ]\t\tseal previously alloc'ed token\n"},
{"acquire", acquire_only_handler, "acquire KEY [ PIN [ TTL milliseconds ] ]\t\tacquire region and print token\n"},
{"release", release_token_handler, "release TOKEN|last [ UNPIN ]\t\trelease previously acquired token\n"},
{"drop", drop_token_handler, "drop TOKEN|last\t\t\t\t\tDrop unpublished ALLOC token\n"},
{"test", test_handler, "test KEY\t\t\t\t\t\ttest if the key exists in priskv\n"},
{"delete", delete_handler, "delete KEY\t\t\t\t\t\tdelete the key from priskv\n"},
Expand Down
25 changes: 17 additions & 8 deletions client/priskv.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern "C"
#endif

#include <stdint.h>
#include <stdbool.h>
#include "priskv-protocol.h" /* Include protocol request flag definitions */

typedef struct priskv_client priskv_client;
typedef struct priskv_memory priskv_memory;
Expand Down Expand Up @@ -105,6 +107,9 @@ typedef enum priskv_status {
/* no such token */
PRISKV_STATUS_NO_SUCH_TOKEN,

/* unpin requested when pin_count == 0 on latest version */
PRISKV_STATUS_UNPIN_NOT_CLOSED,

/* invalid SGL. the number of SGL within a command must not exceed @max_sgl */
PRISKV_STATUS_INVALID_SGL,

Expand Down Expand Up @@ -169,6 +174,9 @@ static inline const char *priskv_status_str(priskv_status status)
case PRISKV_STATUS_NO_SUCH_TOKEN:
return "No such token";

case PRISKV_STATUS_UNPIN_NOT_CLOSED:
return "Unpin operation not closed";

case PRISKV_STATUS_INVALID_SGL:
return "Invalid SGL";

Expand Down Expand Up @@ -262,16 +270,17 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le
uint64_t timeout, uint64_t request_id, priskv_generic_cb cb);

/* Seal memory region alloced for write (by token pointer, reuse key field) */
int priskv_seal_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
priskv_generic_cb cb);
int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal,
uint64_t request_id, priskv_generic_cb cb);

/* Acquire memory region for zero copy read */
int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout,
uint64_t request_id, priskv_generic_cb cb);
bool pin_on_acquire, uint64_t request_id, priskv_generic_cb cb);

/* Release memory region (by token pointer, reuse key field) */
int priskv_release_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
priskv_generic_cb cb);
int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release,
uint64_t request_id, priskv_generic_cb cb);


/* Drop memory region (by token pointer, reuse key field) */
int priskv_drop_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
Expand Down Expand Up @@ -325,11 +334,11 @@ uint64_t priskv_capacity(priskv_client *client);

int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, uint64_t timeout,
priskv_memory_region *region);
int priskv_seal(priskv_client *client, const uint64_t *token);
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal);

int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
priskv_memory_region *region);
int priskv_release(priskv_client *client, const uint64_t *token);
bool pin_on_acquire, priskv_memory_region *region);
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release);

int priskv_drop(priskv_client *client, const uint64_t *token);

Expand Down
15 changes: 9 additions & 6 deletions client/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,22 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length,
return req_sync.status;
}

int priskv_seal(priskv_client *client, const uint64_t *token)
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal)
{
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};
priskv_seal_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb);
priskv_seal_async(client, token, pin_on_seal, (uint64_t)&req_sync,
priskv_common_sync_cb);

priskv_sync_wait(client, &req_sync.done);
return req_sync.status;
}

int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
priskv_memory_region *region)
bool pin_on_acquire, priskv_memory_region *region)
{
priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false};
priskv_acquire_async(client, key, timeout, (uint64_t)&req_sync, priskv_zero_copy_req_sync_cb);
priskv_acquire_async(client, key, timeout, pin_on_acquire, (uint64_t)&req_sync,
priskv_zero_copy_req_sync_cb);

priskv_sync_wait(client, &req_sync.done);
if (req_sync.status == PRISKV_STATUS_OK && region) {
Expand All @@ -201,10 +203,11 @@ int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
return req_sync.status;
}

int priskv_release(priskv_client *client, const uint64_t *token)
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release)
{
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};
priskv_release_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb);
priskv_release_async(client, token, unpin_on_release, (uint64_t)&req_sync,
priskv_common_sync_cb);
priskv_sync_wait(client, &req_sync.done);

return req_sync.status;
Expand Down
1 change: 1 addition & 0 deletions client/transport/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ static int priskv_rdma_req_send(void *arg)
req->command = htobe16(rdma_req->cmd);
req->nsgl = htobe16(rdma_req->nsgl);
req->timeout = htobe64(rdma_req->timeout);
req->flags = htobe32(rdma_req->req_flags);
req->key_length = htobe16(rdma_req->keylen);

struct timeval client_metadata_send_time;
Expand Down
Loading
Loading