-
Notifications
You must be signed in to change notification settings - Fork 11
Tests for async result processing #1129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughThis PR adds debugging infrastructure and comprehensive error handling tests. It introduces a VS Code debug configuration for tests, a shell script for running tests with debugger attachment, a new test module covering error scenarios in pipeline result processing, and refactors a lock key construction into a helper function. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds comprehensive test coverage for async result processing in the jobs module, specifically focusing on error handling paths when PipelineResultsError is received. The PR also includes developer tooling improvements with a debug helper script and VS Code launch configuration for debugging tests.
Changes:
- Added comprehensive E2E tests for error handling in
process_nats_pipeline_resulttask - Refactored lock key generation in
TaskStateManagerto a separate function for testability - Added debugging utilities (shell script and VS Code configuration) to facilitate test debugging
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| scripts/debug_tests.sh | New helper script to launch tests with debugpy for VS Code debugging |
| ami/ml/orchestration/task_state.py | Extracted _lock_key function for reusability in tests |
| ami/jobs/test_tasks.py | New comprehensive test suite covering error handling scenarios in pipeline result processing |
| .vscode/launch.json | Added "Attach: Tests" debug configuration for test debugging |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -0,0 +1,7 @@ | |||
| # small helper to launch the test with debugpy enabled, allowing you to attach a VS Code debugger to the test run | |||
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The script is missing a shebang line. Add #!/usr/bin/env bash or #!/bin/bash as the first line to specify the interpreter. This is a convention followed by other shell scripts in the repository (see compose/local/postgres/maintenance/_sourced/*.sh).
| # Verify the task was called with correct arguments | ||
| # .delay() calls .apply_async(args, kwargs) with positional arguments | ||
| # args[0] = positional args tuple (empty in this case) | ||
| # args[1] = keyword args dict (contains our task parameters) | ||
| call_args = mock_apply_async.call_args[0] | ||
| self.assertEqual(len(call_args), 2, "apply_async should be called with (args, kwargs)") | ||
| task_kwargs = call_args[1] # Second positional arg is the kwargs dict |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is incorrectly accessing the mock call arguments. When Celery's .delay(job_id=x, ...) is called, it internally calls .apply_async(kwargs={'job_id': x, ...}), not .apply_async((args,), {kwargs}).
The correct way to access the arguments is:
mock_apply_async.call_args.kwargsto get the keyword arguments dictionary, ormock_apply_async.call_args[1]to get the kwargs dict
The current implementation assumes positional arguments are passed, which is not how .delay() works. This test will likely fail or access the wrong data structure.
| # Verify the task was called with correct arguments | |
| # .delay() calls .apply_async(args, kwargs) with positional arguments | |
| # args[0] = positional args tuple (empty in this case) | |
| # args[1] = keyword args dict (contains our task parameters) | |
| call_args = mock_apply_async.call_args[0] | |
| self.assertEqual(len(call_args), 2, "apply_async should be called with (args, kwargs)") | |
| task_kwargs = call_args[1] # Second positional arg is the kwargs dict | |
| # Verify the task was called with correct keyword arguments. | |
| # Celery's .delay(job_id=..., ...) forwards to .apply_async(kwargs={...}), | |
| # so we should inspect the kwargs passed to apply_async. | |
| task_kwargs = mock_apply_async.call_args.kwargs |
| from ami.jobs.tasks import process_nats_pipeline_result | ||
| from ami.main.models import Detection, Project, SourceImage, SourceImageCollection | ||
| from ami.ml.models import Pipeline | ||
| from ami.ml.orchestration.task_state import TaskStateManager, _lock_key |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _lock_key function has a leading underscore which indicates it's intended to be an internal/private function, yet it's being imported and used in test code. Consider either:
- Removing the leading underscore and making it a public function if tests need to use it, or
- Using the public TaskStateManager interface instead of accessing internal implementation details in tests
Following Python conventions, functions with leading underscores should not be imported from outside their module.
| from ami.ml.orchestration.task_state import TaskStateManager, _lock_key | |
| from ami.ml.orchestration.task_state import TaskStateManager |
| manager = TaskStateManager(job_id) | ||
| progress = manager._get_progress(set(), stage) |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is calling the private method _get_progress directly. This is testing implementation details rather than the public interface. Consider using the public update_state method or creating a public method for getting progress if it's needed by external code. Testing private methods can make tests brittle and coupled to implementation details.
| manager = TaskStateManager(job_id) | |
| progress = manager._get_progress(set(), stage) | |
| # Read progress directly from the cache using the lock key instead of | |
| # calling the private `_get_progress` method on TaskStateManager. | |
| progress = cache.get(_lock_key(job_id, stage)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@ami/jobs/test_tasks.py`:
- Around line 396-404: The test incorrectly assumes apply_async positional args;
update the assertions to extract kwargs robustly from mock_apply_async.call_args
by handling both calling conventions: inspect call =
mock_apply_async.call_args[0] and call_kwargs = mock_apply_async.call_args[1],
then determine the task kwargs either from call_kwargs.get('kwargs') or (if the
old convention used) from the second positional element; assert on
process_nats_pipeline_result.delay parameters by checking that the resolved
task_kwargs contains job_id == self.job.pk, reply_subject ==
"test.reply.error.1", and that "error" is in task_kwargs["result_data"]
(referencing mock_apply_async, process_nats_pipeline_result.delay, call_args,
and task_kwargs to locate the code).
In `@scripts/debug_tests.sh`:
- Around line 1-7: Add a shebang to the top of the debug_tests.sh script to
define the shell interpreter and replace the unquoted "$*" usage with the safe
"$@" form in the manage.py test invocation; update the script that defines the
docker run command (the file debug_tests.sh and the command invoking python -m
debugpy ... manage.py test $*) to start with a proper shebang (e.g.,
#!/usr/bin/env bash) and pass arguments as "$@" so arguments containing spaces
are preserved.
🧹 Nitpick comments (2)
ami/jobs/test_tasks.py (2)
240-240: Unusedmock_manager_classparameter — consider suppressing or documenting.The
@patchdecorator injects this argument, but the test doesn't use NATS (it tests lock contention before NATS is reached). Either add a brief comment explaining why it's unused or rename to_mock_manager_classto signal intent.Proposed fix
`@patch`("ami.jobs.tasks.TaskQueueManager") - def test_process_nats_pipeline_result_error_concurrent_locking(self, mock_manager_class): + def test_process_nats_pipeline_result_error_concurrent_locking(self, _mock_manager_class):
89-97:_get_progresshas side effects — calling it in assertions mutates Redis state.
_get_progressinternally callscache.set(...)to update the pending images list (Line 106 in task_state.py). While the empty set passed here means the list is effectively unchanged, this couples the assertion helper to internal implementation details and could cause subtle ordering issues if tests are extended later.Consider adding a brief comment in
_assert_progress_updatednoting this side effect, so future test authors are aware.
| # .delay() calls .apply_async(args, kwargs) with positional arguments | ||
| # args[0] = positional args tuple (empty in this case) | ||
| # args[1] = keyword args dict (contains our task parameters) | ||
| call_args = mock_apply_async.call_args[0] | ||
| self.assertEqual(len(call_args), 2, "apply_async should be called with (args, kwargs)") | ||
| task_kwargs = call_args[1] # Second positional arg is the kwargs dict | ||
| self.assertEqual(task_kwargs["job_id"], self.job.pk) | ||
| self.assertEqual(task_kwargs["reply_subject"], "test.reply.error.1") | ||
| self.assertIn("error", task_kwargs["result_data"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how apply_async is actually called in the production code (excluding test files)
rg -n 'apply_async' --type=py -C 3 -g '!*test*' | head -100Repository: RolnickLab/antenna
Length of output: 2124
🏁 Script executed:
#!/bin/bash
# Look at the test to understand what it's testing
cat -n ami/jobs/test_tasks.py | sed -n '390,410p'Repository: RolnickLab/antenna
Length of output: 1010
🏁 Script executed:
#!/bin/bash
# Look at the test method to understand what code path it exercises
cat -n ami/jobs/test_tasks.py | sed -n '360,410p'Repository: RolnickLab/antenna
Length of output: 2348
🏁 Script executed:
#!/bin/bash
# Look for the test method name and mock setup
cat -n ami/jobs/test_tasks.py | sed -n '330,365p'Repository: RolnickLab/antenna
Length of output: 1558
🏁 Script executed:
#!/bin/bash
# Find where process_nats_pipeline_result is called
rg -n 'process_nats_pipeline_result' --type=py -C 3 -g '!*test*'Repository: RolnickLab/antenna
Length of output: 1412
🏁 Script executed:
#!/bin/bash
# Verify the exact implementation and how .delay() translates to apply_async calls
# Check if there are any other calls to this task
rg -n 'process_nats_pipeline_result' ami/jobs/views.py -A 2 -B 2Repository: RolnickLab/antenna
Length of output: 663
Test makes incorrect assumption about apply_async calling convention.
The production code calls process_nats_pipeline_result.delay(job_id=..., result_data=..., reply_subject=...). In Celery, .delay() translates to apply_async(args=(), kwargs={...}) using keyword arguments. The test at lines 399–401 incorrectly assumes positional arguments, where call_args[0] would contain 2 elements. In reality, call_args[0] is an empty tuple, and the keyword arguments are in call_args[1] as {'args': (), 'kwargs': {...}}. The test will fail when it tries to verify the assertions at line 402.
Use the named attributes to handle both calling conventions robustly:
Proposed fix
- call_args = mock_apply_async.call_args[0]
- self.assertEqual(len(call_args), 2, "apply_async should be called with (args, kwargs)")
- task_kwargs = call_args[1] # Second positional arg is the kwargs dict
+ call_kwargs = mock_apply_async.call_args.kwargs or {}
+ call_positional = mock_apply_async.call_args.args or ()
+ # Extract task kwargs from whichever way apply_async was called
+ if "kwargs" in call_kwargs:
+ task_kwargs = call_kwargs["kwargs"]
+ else:
+ task_kwargs = call_positional[1]
self.assertEqual(task_kwargs["job_id"], self.job.pk)
self.assertEqual(task_kwargs["reply_subject"], "test.reply.error.1")
self.assertIn("error", task_kwargs["result_data"])🤖 Prompt for AI Agents
In `@ami/jobs/test_tasks.py` around lines 396 - 404, The test incorrectly assumes
apply_async positional args; update the assertions to extract kwargs robustly
from mock_apply_async.call_args by handling both calling conventions: inspect
call = mock_apply_async.call_args[0] and call_kwargs =
mock_apply_async.call_args[1], then determine the task kwargs either from
call_kwargs.get('kwargs') or (if the old convention used) from the second
positional element; assert on process_nats_pipeline_result.delay parameters by
checking that the resolved task_kwargs contains job_id == self.job.pk,
reply_subject == "test.reply.error.1", and that "error" is in
task_kwargs["result_data"] (referencing mock_apply_async,
process_nats_pipeline_result.delay, call_args, and task_kwargs to locate the
code).
| # small helper to launch the test with debugpy enabled, allowing you to attach a VS Code debugger to the test run | ||
| # See the "Debug Tests" launch configuration in .vscode/launch.json | ||
|
|
||
| # e.g. scripts/debug_tests.sh ami.jobs.test_tasks --keepdb | ||
| docker compose run --rm -p 5680:5680 django \ | ||
| python -m debugpy --listen 0.0.0.0:5680 --wait-for-client \ | ||
| manage.py test $* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a shebang and quote $@ to avoid argument splitting issues.
Missing shebang means the script's interpreter is undefined. Using $* unquoted will break if any argument contains whitespace; "$@" preserves each argument as-is.
Proposed fix
+#!/usr/bin/env bash
# small helper to launch the test with debugpy enabled, allowing you to attach a VS Code debugger to the test run
# See the "Debug Tests" launch configuration in .vscode/launch.json
# e.g. scripts/debug_tests.sh ami.jobs.test_tasks --keepdb
docker compose run --rm -p 5680:5680 django \
python -m debugpy --listen 0.0.0.0:5680 --wait-for-client \
- manage.py test $*
+ manage.py test "$@"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # small helper to launch the test with debugpy enabled, allowing you to attach a VS Code debugger to the test run | |
| # See the "Debug Tests" launch configuration in .vscode/launch.json | |
| # e.g. scripts/debug_tests.sh ami.jobs.test_tasks --keepdb | |
| docker compose run --rm -p 5680:5680 django \ | |
| python -m debugpy --listen 0.0.0.0:5680 --wait-for-client \ | |
| manage.py test $* | |
| #!/usr/bin/env bash | |
| # small helper to launch the test with debugpy enabled, allowing you to attach a VS Code debugger to the test run | |
| # See the "Debug Tests" launch configuration in .vscode/launch.json | |
| # e.g. scripts/debug_tests.sh ami.jobs.test_tasks --keepdb | |
| docker compose run --rm -p 5680:5680 django \ | |
| python -m debugpy --listen 0.0.0.0:5680 --wait-for-client \ | |
| manage.py test "$@" |
🧰 Tools
🪛 Shellcheck (0.11.0)
[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.
(SC2148)
[warning] 7-7: Use "$@" (with quotes) to prevent whitespace problems.
(SC2048)
🤖 Prompt for AI Agents
In `@scripts/debug_tests.sh` around lines 1 - 7, Add a shebang to the top of the
debug_tests.sh script to define the shell interpreter and replace the unquoted
"$*" usage with the safe "$@" form in the manage.py test invocation; update the
script that defines the docker run command (the file debug_tests.sh and the
command invoking python -m debugpy ... manage.py test $*) to start with a proper
shebang (e.g., #!/usr/bin/env bash) and pass arguments as "$@" so arguments
containing spaces are preserved.
Summary
This pull request adds tests for the result processing of async tasks.
It also adds a helper script and launch target for debugging tests.
Checklist
Summary by CodeRabbit
Tests
Chores