Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/workflows/test-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ on:
required: false
type: string
default: "amd64"
branch:
required: false
type: string

jobs:
test-app:
Expand All @@ -26,14 +29,24 @@ jobs:
run: |
sudo apt install -y ./deb/acton_*.deb
acton --version
- name: "Clone app repo"
- name: "Clone app repo with default branch"
if: inputs.branch == ''
uses: actions/checkout@v4
with:
repository: ${{ inputs.repo_url }}
path: app

- name: "Clone app repo with specific branch"
if: inputs.branch != ''
uses: actions/checkout@v4
with:
repository: ${{ inputs.repo_url }}
path: app
ref: ${{ inputs.branch }}
- name: "Compile acton program"
run: |
cd app
acton build
for ((i = 0; i < 1000; i++)); do out/bin/test_ttt_callbacks; echo "\n\n\n"; done
acton test
acton test perf
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ jobs:
uses: "./.github/workflows/test-app.yml"
with:
repo_url: "orchestron-orchestrator/orchestron"
branch: "debug"

test-app-snappy:
needs: build-debs
Expand Down
9 changes: 9 additions & 0 deletions base/rts/q.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ int ENQ_ready($Actor a) {
#elif defined MPMC && MPMC == 2
int ENQ_ready($Actor a) {
int i = a->$affinity;
B_Msg m = a ? a->$waitsfor : NULL;
if (m != NULL) {
fprintf(stderr, "????????????? ENQ_ready inserts actor %ld that waits for msg %p to actor %ld!\n", a->$globkey, m, m->$to->$globkey);
}
spinlock_lock(&rqs[i].lock);
if (rqs[i].tail) {
rqs[i].tail->$next = a;
Expand Down Expand Up @@ -91,7 +95,12 @@ int ENQ_ready($Actor a) {
rqs[idx].tail = NULL;
}
rqs[idx].count--;
B_Msg m = res ? res->$waitsfor : NULL;
spinlock_unlock(&rqs[idx].lock);
if (m != NULL) {
fprintf(stderr, "????????????? DEQ_ready returns actor %ld that waits for msg %p to actor %ld!\n", res->$globkey, m, m->$to->$globkey);
}
// assert(res->$waitsfor == NULL);
return res;
}
#else
Expand Down
42 changes: 34 additions & 8 deletions base/rts/rts.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,18 @@ bool ADD_waiting($Actor a, B_Msg m) {

assert(m != NULL);

$Actor prev;
spinlock_lock(&m->$wait_lock);
if (!FROZEN(m)) {
prev = m->$waiting;
a->$next = m->$waiting;
m->$waiting = a;
did_add = true;
}
spinlock_unlock(&m->$wait_lock);
if (did_add) {
fprintf(stderr, " (wrote waiting %p to %p, was %p)\n", a, m, prev);
}
return did_add;
}

Expand Down Expand Up @@ -798,6 +803,7 @@ B_Msg $ASYNC($Actor to, $Cont cont) {
B_Msg m = B_MsgG_newXX(to, cont, baseline, &$Done$instance);
if (self) { // $ASYNC called by actor code
m->$baseline = self->B_Msg->$baseline;
fprintf(stderr, " Outgoing msg %p by actor %ld (waiting: %p)\n", m, self->$globkey, m->$waiting);
PUSH_outgoing(self, m);
} else { // $ASYNC called by the event loop
m->$baseline = current_time();
Expand Down Expand Up @@ -978,7 +984,10 @@ void FLUSH_outgoing_local($Actor self) {
if (m->$baseline == self->B_Msg->$baseline) {
$Actor to = m->$to;
if (ENQ_msg(m, to)) {
fprintf(stderr, ">>>> ASYNC msg %p from actor %ld WAKEUP target %ld (waiting: %p)\n", m, self->$globkey, to->$globkey, m->$waiting);
ENQ_ready(to);
} else {
fprintf(stderr, ">>>> ASYNC msg %p from actor %ld queued on target %ld (waiting: %p)\n", m, self->$globkey, to->$globkey, m->$waiting);
}
dest = to->$globkey;
} else {
Expand All @@ -1000,6 +1009,7 @@ void handle_timeout() {
if (m) {
rtsd_printf("## Dequeued timed msg with baseline %ld (now is %ld)", m->$baseline, now);
if (ENQ_msg(m, m->$to)) {
fprintf(stderr, " Timed WAKEUP of actor %ld\n", m->$to->$globkey);
int wtid = ENQ_ready(m->$to);
wake_wt(wtid);
}
Expand Down Expand Up @@ -1419,6 +1429,7 @@ void BOOTSTRAP(int argc, char *argv[]) {
}
#endif
if (ENQ_msg(m, root_actor)) {
fprintf(stderr, "Bootstrap WAKEUP %ld\n", root_actor->$globkey);
ENQ_ready(root_actor);
}

Expand Down Expand Up @@ -1554,7 +1565,8 @@ void wt_work_cb(uv_check_t *ev) {
if (!wctx->jump0) {
wctx->jump0 = wctx->jump_top;
}
rtsd_printf("## Running actor %ld : %s", current->$globkey, current->$class->$GCINFO);
rtsd_printf("## Running actor %ld : %s", wctx->id, current->$globkey, current->$class->$GCINFO);
fprintf(stderr, "(%d) Run %ld msg %p (%p)\n", wctx->id, current->$globkey, m, m->$waiting);
r = cont->$class->__call__(cont, val);

uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts2);
Expand Down Expand Up @@ -1586,17 +1598,24 @@ void wt_work_cb(uv_check_t *ev) {
case $RDONE: {
save_actor_state(current, m);
m->value = r.value; // m->value holds the message result,
fprintf(stderr, " (%d) (Result by actor %ld for msg %p, waiting: %p)\n", wctx->id, current->$globkey, m, m->$waiting);
$Actor b = FREEZE_waiting(m, MARK_RESULT); // so mark this and stop further m->waiting additions
while (b) {
b->B_Msg->value = r.value;
b->$waitsfor = NULL;
$Actor c = b->$next;
ENQ_ready(b);
rtsd_printf("## Waking up actor %ld : %s", b->$globkey, b->$class->$GCINFO);
b = c;
if (b) {
while (b) {
b->B_Msg->value = r.value;
b->$waitsfor = NULL;
$Actor c = b->$next;
fprintf(stderr, "==== (%d) Result by actor %ld for msg %p, WAKEUP client %ld\n", wctx->id, current->$globkey, m, b->$globkey);
ENQ_ready(b);
rtsd_printf("## Waking up actor %ld : %s", b->$globkey, b->$class->$GCINFO);
b = c;
}
} else {
fprintf(stderr, "==== (%d) Result by actor %ld for msg %p, no clients\n", wctx->id, current->$globkey, m);
}
rtsd_printf("## DONE actor %ld : %s", current->$globkey, current->$class->$GCINFO);
if (DEQ_msg(current)) {
fprintf(stderr, " (%d) More work for actor %ld, keep AWAKE\n", wctx->id, current->$globkey);
ENQ_ready(current);
}
break;
Expand Down Expand Up @@ -1635,9 +1654,11 @@ void wt_work_cb(uv_check_t *ev) {
$Actor c = b->$next;
ENQ_ready(b);
rtsd_printf("## Propagating exception to actor %ld : %s", b->$globkey, b->$class->$GCINFO);
fprintf(stderr, "==== Exception by actor %ld for msg %p, waking up client %ld\n", current->$globkey, m, b->$globkey);
b = c;
}
if (DEQ_msg(current)) {
fprintf(stderr, " More work for actor %ld after exception, keep AWAKE\n", current->$globkey);
ENQ_ready(current);
}
rtsd_printf("## Done handling failed actor %ld : %s", current->$globkey, current->$class->$GCINFO);
Expand Down Expand Up @@ -1676,14 +1697,17 @@ void wt_work_cb(uv_check_t *ev) {
assert(x != NULL);
if (ADD_waiting(current, x)) { // x->cont is a proper $Cont: x is still being processed so current was added to x->waiting
rtsd_printf("## AWAIT actor %ld : %s", current->$globkey, current->$class->$GCINFO);
fprintf(stderr, "---- (%d) AWAIT by client %ld on msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey);
current->$waitsfor = x;
} else if (EXCEPTIONAL(x)) { // x->cont == MARK_EXCEPTION: x->value holds the raised exception, current is not in x->waiting
rtsd_printf("## AWAIT/fail actor %ld : %s", current->$globkey, current->$class->$GCINFO);
fprintf(stderr, "---- (%d) Immediate exception found by client %ld in msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey);
m->$cont = &$Fail$instance;
m->value = x->value;
ENQ_ready(current);
} else { // x->cont == MARK_RESULT: x->value holds the final response, current is not in x->waiting
rtsd_printf("## AWAIT/wakeup actor %ld : %s", current->$globkey, current->$class->$GCINFO);
fprintf(stderr, "---- (%d) Immediate result found by client %ld in msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey);
m->value = x->value;
ENQ_ready(current);
}
Expand Down Expand Up @@ -2347,6 +2371,8 @@ int main(int argc, char **argv) {
appname = argv[0];
pid = getpid();

fprintf(stderr, "RTS-DEBUG\n");

#ifndef _WIN32
// Do line buffered output
setlinebuf(stdout);
Expand Down
Loading