diff --git a/.github/workflows/test-app.yml b/.github/workflows/test-app.yml index 78afcec50..5a3ba9226 100644 --- a/.github/workflows/test-app.yml +++ b/.github/workflows/test-app.yml @@ -9,6 +9,9 @@ on: required: false type: string default: "amd64" + branch: + required: false + type: string jobs: test-app: @@ -26,14 +29,24 @@ jobs: run: | sudo apt install -y ./deb/acton_*.deb acton --version - - name: "Clone app repo" + - name: "Clone app repo with default branch" + if: inputs.branch == '' + uses: actions/checkout@v4 + with: + repository: ${{ inputs.repo_url }} + path: app + + - name: "Clone app repo with specific branch" + if: inputs.branch != '' uses: actions/checkout@v4 with: repository: ${{ inputs.repo_url }} path: app + ref: ${{ inputs.branch }} - name: "Compile acton program" run: | cd app acton build + for ((i = 0; i < 1000; i++)); do out/bin/test_ttt_callbacks; echo "\n\n\n"; done acton test acton test perf diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3596c033b..193b2eae9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -528,6 +528,7 @@ jobs: uses: "./.github/workflows/test-app.yml" with: repo_url: "orchestron-orchestrator/orchestron" + branch: "debug" test-app-snappy: needs: build-debs diff --git a/base/rts/q.c b/base/rts/q.c index 8b2b4fcd8..6c3de4c8b 100644 --- a/base/rts/q.c +++ b/base/rts/q.c @@ -17,6 +17,10 @@ int ENQ_ready($Actor a) { #elif defined MPMC && MPMC == 2 int ENQ_ready($Actor a) { int i = a->$affinity; + B_Msg m = a ? a->$waitsfor : NULL; + if (m != NULL) { + fprintf(stderr, "????????????? ENQ_ready inserts actor %ld that waits for msg %p to actor %ld!\n", a->$globkey, m, m->$to->$globkey); + } spinlock_lock(&rqs[i].lock); if (rqs[i].tail) { rqs[i].tail->$next = a; @@ -91,7 +95,12 @@ int ENQ_ready($Actor a) { rqs[idx].tail = NULL; } rqs[idx].count--; + B_Msg m = res ? res->$waitsfor : NULL; spinlock_unlock(&rqs[idx].lock); + if (m != NULL) { + fprintf(stderr, "????????????? DEQ_ready returns actor %ld that waits for msg %p to actor %ld!\n", res->$globkey, m, m->$to->$globkey); + } +// assert(res->$waitsfor == NULL); return res; } #else diff --git a/base/rts/rts.c b/base/rts/rts.c index e24286f38..f70676305 100644 --- a/base/rts/rts.c +++ b/base/rts/rts.c @@ -571,13 +571,18 @@ bool ADD_waiting($Actor a, B_Msg m) { assert(m != NULL); + $Actor prev; spinlock_lock(&m->$wait_lock); if (!FROZEN(m)) { + prev = m->$waiting; a->$next = m->$waiting; m->$waiting = a; did_add = true; } spinlock_unlock(&m->$wait_lock); + if (did_add) { + fprintf(stderr, " (wrote waiting %p to %p, was %p)\n", a, m, prev); + } return did_add; } @@ -798,6 +803,7 @@ B_Msg $ASYNC($Actor to, $Cont cont) { B_Msg m = B_MsgG_newXX(to, cont, baseline, &$Done$instance); if (self) { // $ASYNC called by actor code m->$baseline = self->B_Msg->$baseline; + fprintf(stderr, " Outgoing msg %p by actor %ld (waiting: %p)\n", m, self->$globkey, m->$waiting); PUSH_outgoing(self, m); } else { // $ASYNC called by the event loop m->$baseline = current_time(); @@ -978,7 +984,10 @@ void FLUSH_outgoing_local($Actor self) { if (m->$baseline == self->B_Msg->$baseline) { $Actor to = m->$to; if (ENQ_msg(m, to)) { + fprintf(stderr, ">>>> ASYNC msg %p from actor %ld WAKEUP target %ld (waiting: %p)\n", m, self->$globkey, to->$globkey, m->$waiting); ENQ_ready(to); + } else { + fprintf(stderr, ">>>> ASYNC msg %p from actor %ld queued on target %ld (waiting: %p)\n", m, self->$globkey, to->$globkey, m->$waiting); } dest = to->$globkey; } else { @@ -1000,6 +1009,7 @@ void handle_timeout() { if (m) { rtsd_printf("## Dequeued timed msg with baseline %ld (now is %ld)", m->$baseline, now); if (ENQ_msg(m, m->$to)) { + fprintf(stderr, " Timed WAKEUP of actor %ld\n", m->$to->$globkey); int wtid = ENQ_ready(m->$to); wake_wt(wtid); } @@ -1419,6 +1429,7 @@ void BOOTSTRAP(int argc, char *argv[]) { } #endif if (ENQ_msg(m, root_actor)) { + fprintf(stderr, "Bootstrap WAKEUP %ld\n", root_actor->$globkey); ENQ_ready(root_actor); } @@ -1554,7 +1565,8 @@ void wt_work_cb(uv_check_t *ev) { if (!wctx->jump0) { wctx->jump0 = wctx->jump_top; } - rtsd_printf("## Running actor %ld : %s", current->$globkey, current->$class->$GCINFO); + rtsd_printf("## Running actor %ld : %s", wctx->id, current->$globkey, current->$class->$GCINFO); + fprintf(stderr, "(%d) Run %ld msg %p (%p)\n", wctx->id, current->$globkey, m, m->$waiting); r = cont->$class->__call__(cont, val); uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts2); @@ -1586,17 +1598,24 @@ void wt_work_cb(uv_check_t *ev) { case $RDONE: { save_actor_state(current, m); m->value = r.value; // m->value holds the message result, + fprintf(stderr, " (%d) (Result by actor %ld for msg %p, waiting: %p)\n", wctx->id, current->$globkey, m, m->$waiting); $Actor b = FREEZE_waiting(m, MARK_RESULT); // so mark this and stop further m->waiting additions - while (b) { - b->B_Msg->value = r.value; - b->$waitsfor = NULL; - $Actor c = b->$next; - ENQ_ready(b); - rtsd_printf("## Waking up actor %ld : %s", b->$globkey, b->$class->$GCINFO); - b = c; + if (b) { + while (b) { + b->B_Msg->value = r.value; + b->$waitsfor = NULL; + $Actor c = b->$next; + fprintf(stderr, "==== (%d) Result by actor %ld for msg %p, WAKEUP client %ld\n", wctx->id, current->$globkey, m, b->$globkey); + ENQ_ready(b); + rtsd_printf("## Waking up actor %ld : %s", b->$globkey, b->$class->$GCINFO); + b = c; + } + } else { + fprintf(stderr, "==== (%d) Result by actor %ld for msg %p, no clients\n", wctx->id, current->$globkey, m); } rtsd_printf("## DONE actor %ld : %s", current->$globkey, current->$class->$GCINFO); if (DEQ_msg(current)) { + fprintf(stderr, " (%d) More work for actor %ld, keep AWAKE\n", wctx->id, current->$globkey); ENQ_ready(current); } break; @@ -1635,9 +1654,11 @@ void wt_work_cb(uv_check_t *ev) { $Actor c = b->$next; ENQ_ready(b); rtsd_printf("## Propagating exception to actor %ld : %s", b->$globkey, b->$class->$GCINFO); + fprintf(stderr, "==== Exception by actor %ld for msg %p, waking up client %ld\n", current->$globkey, m, b->$globkey); b = c; } if (DEQ_msg(current)) { + fprintf(stderr, " More work for actor %ld after exception, keep AWAKE\n", current->$globkey); ENQ_ready(current); } rtsd_printf("## Done handling failed actor %ld : %s", current->$globkey, current->$class->$GCINFO); @@ -1676,14 +1697,17 @@ void wt_work_cb(uv_check_t *ev) { assert(x != NULL); if (ADD_waiting(current, x)) { // x->cont is a proper $Cont: x is still being processed so current was added to x->waiting rtsd_printf("## AWAIT actor %ld : %s", current->$globkey, current->$class->$GCINFO); + fprintf(stderr, "---- (%d) AWAIT by client %ld on msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey); current->$waitsfor = x; } else if (EXCEPTIONAL(x)) { // x->cont == MARK_EXCEPTION: x->value holds the raised exception, current is not in x->waiting rtsd_printf("## AWAIT/fail actor %ld : %s", current->$globkey, current->$class->$GCINFO); + fprintf(stderr, "---- (%d) Immediate exception found by client %ld in msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey); m->$cont = &$Fail$instance; m->value = x->value; ENQ_ready(current); } else { // x->cont == MARK_RESULT: x->value holds the final response, current is not in x->waiting rtsd_printf("## AWAIT/wakeup actor %ld : %s", current->$globkey, current->$class->$GCINFO); + fprintf(stderr, "---- (%d) Immediate result found by client %ld in msg %p to actor %ld\n", wctx->id, current->$globkey, x, x->$to->$globkey); m->value = x->value; ENQ_ready(current); } @@ -2347,6 +2371,8 @@ int main(int argc, char **argv) { appname = argv[0]; pid = getpid(); + fprintf(stderr, "RTS-DEBUG\n"); + #ifndef _WIN32 // Do line buffered output setlinebuf(stdout);