Conversation
📝 WalkthroughWalkthroughRefactors STT evaluation to enqueue batch submission as a Celery low-priority job instead of submitting synchronously; removes several CRUD helpers tied to per-sample result creation and updates; adds a Celery-executable batch_job service and shifts result persistence to bulk operations and concurrent signed-URL generation. Changes
Sequence DiagramsequenceDiagram
participant Client
participant API as API Route
participant Queue as Celery Queue
participant Worker as Batch Worker
participant DB as Database
participant Service as Batch Service
Client->>API: POST start STT evaluation
API->>DB: create STT run (pending)
DB-->>API: run created
API->>Queue: enqueue execute_batch_submission(run_id, dataset_id, org_id)
Queue-->>API: returns task_id
API-->>Client: 202 Accepted (run_id, status: pending, task_id)
Queue->>Worker: deliver task
Worker->>DB: fetch run by id/org/project
Worker->>DB: fetch samples for dataset (limit = run.total_items)
Worker->>Service: start_stt_evaluation_batch(run, samples)
Service->>Service: generate signed URLs concurrently
Service->>External: submit batch job to provider
External-->>Service: batch response (keys/status)
Service->>DB: bulk_insert STT results (on completion)
Service->>DB: update run status (submitted/processing/failed)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 🧹 Recent nitpick comments
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/crud/stt_evaluations/cron.py (2)
118-118:⚠️ Potential issue | 🟡 MinorDead code path:
"processed"action is never returned bypoll_stt_run.
poll_stt_runreturns action values"completed","failed", or"no_change"— never"processed". The"processed"check here is unreachable, which means the intent might be wrong or this is leftover from a prior version.Suggested fix
- if result["action"] in ("completed", "processed"): + if result["action"] == "completed":
354-359: 🛠️ Refactor suggestion | 🟠 Major
batch_jobparameter typed asAny— should beBatchJob.The coding guidelines require type hints on all function parameters.
batch_jobis used as aBatchJobthroughout (accessing.id,.config,.provider_batch_id), so the type annotation should reflect that.Suggested fix
async def process_completed_stt_batch( session: Session, run: EvaluationRun, - batch_job: Any, + batch_job: BatchJob, batch_provider: GeminiBatchProvider, ) -> None:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
🤖 Fix all issues with AI agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py`:
- Around line 108-111: The HTTPException currently returns internal exception
text via detail=f"Failed to queue batch submission: {e}"; change this to a
generic message (e.g. detail="Failed to queue batch submission") so internal
error details are not leaked to clients, keep the status_code=500, and ensure
the original exception is still logged (the existing logger call earlier in the
surrounding function in evaluation.py should remain unchanged); locate the raise
HTTPException(...) in the batch submission handler in evaluation.py and remove
the interpolated exception from the detail string.
In `@backend/app/crud/stt_evaluations/cron.py`:
- Around line 394-404: In process_completed_stt_batch's loop over
batch_responses replace the direct indexing response["custom_id"] with
response.get("custom_id", None) so a missing key yields None; keep the existing
try/except around int(raw_sample_id) (which will catch the None as TypeError)
and ensure you still log the same warning with batch_job.id and the
raw_sample_id value, increment failure_count and continue on error; reference
variables/functions: batch_responses, response, raw_sample_id, stt_sample_id,
batch_job.id, failure_count, and the surrounding for loop in cron.py.
In `@backend/app/services/stt_evaluations/batch_job.py`:
- Line 40: The direct conversion run_id = int(job_id) can raise ValueError for
non-numeric job_id; wrap the conversion in a try/except ValueError around the
run_id = int(job_id) statement, validate job_id first if you prefer, and in the
except block log the error (including the offending job_id), update the run/task
status to a failed/error state and exit/return the Celery task early so the run
record is updated instead of letting the task crash unhandled.
🧹 Nitpick comments (6)
backend/app/crud/stt_evaluations/dataset.py (1)
298-299: Misleading comment: limit is still applied, only the default was removed.The comment says "removing limit" but the query still applies
.limit(limit)on line 324. Consider rewording to clarify that the default was removed to force callers to be explicit about how many samples they want.Suggested wording
- # removing limit for now since we need all samples for batch job, can add pagination later if needed - limit: int, + limit: int, # no default; callers must specify explicitlybackend/app/crud/stt_evaluations/batch.py (1)
107-107: Consider makingmax_workersconfigurable.The hardcoded
max_workers=10is reasonable for typical workloads, but for large datasets with hundreds of samples, tuning may be needed. A constant or parameter would make this easier to adjust.backend/app/services/stt_evaluations/batch_job.py (1)
15-23: Add type hint fortask_instanceand return type.
task_instancelacks a type annotation. Per coding guidelines, all function parameters and return values should have type hints. Also, the return typedictcould be narrowed.Suggested fix
+from typing import Any +from celery import Task + def execute_batch_submission( project_id: int, job_id: str, task_id: str, - task_instance, + task_instance: Task, organization_id: int, dataset_id: int, **kwargs, -) -> dict: +) -> dict[str, Any]:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
backend/app/tests/api/routes/test_stt_evaluation.py (1)
545-585: Test correctly updated for Celery offload flow.The patch target, mock return value, and assertions align well with the route changes.
Consider using
assert_called_once_with(...)to verify the correctfunction_path,job_id,organization_id, anddataset_idare passed tostart_low_priority_job, which would catch wiring bugs.backend/app/crud/stt_evaluations/cron.py (2)
371-374: Log format inconsistency: mixed=and:separators.Lines 372–373 and 443–446 use
key=valueformat (e.g.,run_id={run.id}), while other functions in this file usekey: {value}format (e.g., Line 128run_id: {run.id}). Consider unifying to one style for consistent structured log parsing.Also applies to: 443-446
427-433: Use modern SQLAlchemy 2.0session.execute(insert(...))instead of legacybulk_insert_mappings().
Session.bulk_insert_mappings()is a legacy feature deprecated in SQLAlchemy 2.0 in favor ofsession.execute(insert(Model), list_of_dicts), which also provides better performance through insertmanyvalues batching. Additionally, if an exception occurs after chunks are flushed but beforesession.commit(), the partially flushed data remains in the session—the exception handler currently re-raises without explicit rollback, delegating recovery to the caller.Replace with:
from sqlalchemy import insert # Bulk insert in batches of 200 insert_batch_size = 200 for i in range(0, len(stt_result_rows), insert_batch_size): chunk = stt_result_rows[i : i + insert_batch_size] session.execute(insert(STTResult), chunk) if stt_result_rows: session.commit()
| raise HTTPException( | ||
| status_code=500, | ||
| detail=f"Failed to queue batch submission: {e}", | ||
| ) |
There was a problem hiding this comment.
Avoid leaking internal exception details in the HTTP response.
detail=f"Failed to queue batch submission: {e}" exposes internal error information to the API client. Use a generic message instead; the detailed error is already logged on line 99.
Suggested fix
raise HTTPException(
status_code=500,
- detail=f"Failed to queue batch submission: {e}",
+ detail="Failed to start evaluation. Please try again later.",
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to queue batch submission: {e}", | |
| ) | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Failed to start evaluation. Please try again later.", | |
| ) |
🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 108 - 111,
The HTTPException currently returns internal exception text via detail=f"Failed
to queue batch submission: {e}"; change this to a generic message (e.g.
detail="Failed to queue batch submission") so internal error details are not
leaked to clients, keep the status_code=500, and ensure the original exception
is still logged (the existing logger call earlier in the surrounding function in
evaluation.py should remain unchanged); locate the raise HTTPException(...) in
the batch submission handler in evaluation.py and remove the interpolated
exception from the detail string.
| for response in batch_responses: | ||
| raw_sample_id = response["custom_id"] | ||
| try: | ||
| sample_id = int(custom_id) | ||
| stt_sample_id = int(raw_sample_id) | ||
| except (ValueError, TypeError): | ||
| logger.warning( | ||
| f"[process_completed_stt_batch] Invalid custom_id | " | ||
| f"batch_job_id={batch_job.id}, custom_id={custom_id}" | ||
| f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}" | ||
| ) | ||
| failed_count += 1 | ||
| failure_count += 1 | ||
| continue |
There was a problem hiding this comment.
response["custom_id"] will raise KeyError if the key is missing.
If a batch response lacks "custom_id", this line throws an unhandled KeyError that propagates up and aborts processing of the entire batch. Use .get() with a fallback and handle the missing-key case alongside the existing ValueError/TypeError guard.
Suggested fix
for response in batch_responses:
- raw_sample_id = response["custom_id"]
+ raw_sample_id = response.get("custom_id")
try:
stt_sample_id = int(raw_sample_id)
except (ValueError, TypeError):
logger.warning(
f"[process_completed_stt_batch] Invalid custom_id | "
f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}"
)
failure_count += 1
continueWith this change, a None value from .get() will be caught by the TypeError handler below, logging a warning and continuing gracefully.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for response in batch_responses: | |
| raw_sample_id = response["custom_id"] | |
| try: | |
| sample_id = int(custom_id) | |
| stt_sample_id = int(raw_sample_id) | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"[process_completed_stt_batch] Invalid custom_id | " | |
| f"batch_job_id={batch_job.id}, custom_id={custom_id}" | |
| f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}" | |
| ) | |
| failed_count += 1 | |
| failure_count += 1 | |
| continue | |
| for response in batch_responses: | |
| raw_sample_id = response.get("custom_id") | |
| try: | |
| stt_sample_id = int(raw_sample_id) | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"[process_completed_stt_batch] Invalid custom_id | " | |
| f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}" | |
| ) | |
| failure_count += 1 | |
| continue |
🤖 Prompt for AI Agents
In `@backend/app/crud/stt_evaluations/cron.py` around lines 394 - 404, In
process_completed_stt_batch's loop over batch_responses replace the direct
indexing response["custom_id"] with response.get("custom_id", None) so a missing
key yields None; keep the existing try/except around int(raw_sample_id) (which
will catch the None as TypeError) and ensure you still log the same warning with
batch_job.id and the raw_sample_id value, increment failure_count and continue
on error; reference variables/functions: batch_responses, response,
raw_sample_id, stt_sample_id, batch_job.id, failure_count, and the surrounding
for loop in cron.py.
| Returns: | ||
| dict: Result summary with batch job info | ||
| """ | ||
| run_id = int(job_id) |
There was a problem hiding this comment.
Unhandled ValueError if job_id is not a valid integer.
int(job_id) will raise ValueError for non-numeric strings. Since this runs in a Celery worker, an unhandled exception will cause the task to fail without updating the run status. Consider wrapping in a try/except or validating before conversion.
Suggested fix
- run_id = int(job_id)
+ try:
+ run_id = int(job_id)
+ except ValueError:
+ logger.error(f"[execute_batch_submission] Invalid job_id: {job_id}")
+ return {"success": False, "error": f"Invalid job_id: {job_id}"}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| run_id = int(job_id) | |
| try: | |
| run_id = int(job_id) | |
| except ValueError: | |
| logger.error(f"[execute_batch_submission] Invalid job_id: {job_id}") | |
| return {"success": False, "error": f"Invalid job_id: {job_id}"} |
🤖 Prompt for AI Agents
In `@backend/app/services/stt_evaluations/batch_job.py` at line 40, The direct
conversion run_id = int(job_id) can raise ValueError for non-numeric job_id;
wrap the conversion in a try/except ValueError around the run_id = int(job_id)
statement, validate job_id first if you prefer, and in the except block log the
error (including the offending job_id), update the run/task status to a
failed/error state and exit/return the Celery task early so the run record is
updated instead of letting the task crash unhandled.
Summary
Target issue is #PLEASE_TYPE_ISSUE_NUMBER
Explain the motivation for making this change. What existing problem does the pull request solve?
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Please add here if any other information is required for the reviewer.
Summary by CodeRabbit
Refactor
Performance
Reliability
Bug Fixes