Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 18 additions & 39 deletions backend/app/api/routes/stt_evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

import logging

from asgi_correlation_id import correlation_id
from fastapi import APIRouter, Body, Depends, HTTPException, Query

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.celery.utils import start_low_priority_job
from app.crud.stt_evaluations import (
create_stt_run,
create_stt_results,
get_results_by_run_id,
get_samples_by_dataset_id,
get_stt_dataset_by_id,
get_stt_run_by_id,
list_stt_runs,
start_stt_evaluation_batch,
update_stt_run,
)
from app.models.stt_evaluation import (
Expand Down Expand Up @@ -80,56 +79,36 @@ def start_stt_evaluation(
total_items=sample_count * len(run_create.models),
)

# Get samples for the dataset
samples = get_samples_by_dataset_id(
session=_session,
dataset_id=run_create.dataset_id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

# Create result records for each sample and model
create_stt_results(
session=_session,
samples=samples,
evaluation_run_id=run.id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
models=run_create.models,
)

# Offload batch submission (signed URLs, JSONL, Gemini upload) to Celery worker
trace_id = correlation_id.get() or "N/A"
try:
batch_result = start_stt_evaluation_batch(
session=_session,
run=run,
samples=samples,
org_id=auth_context.organization_.id,
celery_task_id = start_low_priority_job(
function_path="app.services.stt_evaluations.batch_job.execute_batch_submission",
project_id=auth_context.project_.id,
job_id=str(run.id),
trace_id=trace_id,
organization_id=auth_context.organization_.id,
dataset_id=run_create.dataset_id,
)
logger.info(
f"[start_stt_evaluation] STT evaluation batch submitted | "
f"run_id: {run.id}, batch_jobs: {list(batch_result.get('batch_jobs', {}).keys())}"
f"[start_stt_evaluation] Batch submission queued | "
f"run_id: {run.id}, celery_task_id: {celery_task_id}"
)
except Exception as e:
logger.error(
f"[start_stt_evaluation] Batch submission failed | "
f"[start_stt_evaluation] Failed to queue batch submission | "
f"run_id: {run.id}, error: {str(e)}"
)
update_stt_run(
session=_session,
run_id=run.id,
status="failed",
error_message=str(e),
error_message=f"Failed to queue batch submission: {str(e)}",
)
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
Comment on lines +108 to 111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid leaking internal exception details in the HTTP response.

detail=f"Failed to queue batch submission: {e}" exposes internal error information to the API client. Use a generic message instead; the detailed error is already logged on line 99.

Suggested fix
         raise HTTPException(
             status_code=500,
-            detail=f"Failed to queue batch submission: {e}",
+            detail="Failed to start evaluation. Please try again later.",
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
raise HTTPException(
status_code=500,
detail="Failed to start evaluation. Please try again later.",
)
🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 108 - 111,
The HTTPException currently returns internal exception text via detail=f"Failed
to queue batch submission: {e}"; change this to a generic message (e.g.
detail="Failed to queue batch submission") so internal error details are not
leaked to clients, keep the status_code=500, and ensure the original exception
is still logged (the existing logger call earlier in the surrounding function in
evaluation.py should remain unchanged); locate the raise HTTPException(...) in
the batch submission handler in evaluation.py and remove the interpolated
exception from the detail string.

raise HTTPException(status_code=500, detail=f"Batch submission failed: {e}")

# Refresh run to get updated status
run = get_stt_run_by_id(
session=_session,
run_id=run.id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

return APIResponse.success_response(
data=STTEvaluationRunPublic(
Expand Down
12 changes: 0 additions & 12 deletions backend/app/api/routes/stt_evaluations/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ def update_result_feedback(
f"result_id: {result_id}, is_correct: {feedback.is_correct}"
)

# Verify result exists and belongs to this project
existing = get_stt_result_by_id(
session=_session,
result_id=result_id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

if not existing:
raise HTTPException(status_code=404, detail="Result not found")

# Update feedback
result = update_human_feedback(
session=_session,
result_id=result_id,
Expand Down
2 changes: 0 additions & 2 deletions backend/app/crud/stt_evaluations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
update_stt_run,
)
from .result import (
create_stt_results,
get_stt_result_by_id,
get_results_by_run_id,
update_human_feedback,
Expand All @@ -39,7 +38,6 @@
"list_stt_runs",
"update_stt_run",
# Result
"create_stt_results",
"get_stt_result_by_id",
"get_results_by_run_id",
"update_human_feedback",
Expand Down
79 changes: 37 additions & 42 deletions backend/app/crud/stt_evaluations/batch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Batch submission functions for STT evaluation processing."""

import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

from sqlmodel import Session
Expand All @@ -12,13 +13,8 @@
)
from app.core.cloud.storage import get_cloud_storage
from app.crud.file import get_files_by_ids
from app.crud.stt_evaluations.result import (
get_pending_results_for_run,
update_stt_result,
)
from app.crud.stt_evaluations.run import update_stt_run
from app.models import EvaluationRun
from app.models.job import JobStatus
from app.models.stt_evaluation import STTSample
from app.services.stt_evaluations.gemini import GeminiClient

Expand Down Expand Up @@ -88,39 +84,49 @@ def start_stt_evaluation_batch(
)
file_map = {f.id: f for f in file_records}

# Generate signed URLs for audio files (shared across all models)
# Generate signed URLs for audio files concurrently (shared across all models)
signed_urls: list[str] = []
sample_keys: list[str] = []

for sample in samples:
failed_samples: list[tuple[STTSample, str]] = []

def _generate_signed_url(
sample: STTSample,
) -> tuple[STTSample, str | None, str | None]:
"""Generate a signed URL for a single sample. Thread-safe."""
file_record = file_map.get(sample.file_id)
if not file_record:
return sample, None, f"File record not found for file_id: {sample.file_id}"
try:
# Get object_store_url from file record
file_record = file_map.get(sample.file_id)
if not file_record:
raise ValueError(f"File record not found for file_id: {sample.file_id}")

signed_url = storage.get_signed_url(
url = storage.get_signed_url(
file_record.object_store_url, expires_in=signed_url_expires_in
)
signed_urls.append(signed_url)
sample_keys.append(str(sample.id))

return sample, url, None
except Exception as e:
logger.error(
f"[start_stt_evaluation_batch] Failed to generate signed URL | "
f"sample_id: {sample.id}, error: {str(e)}"
)
pending = get_pending_results_for_run(
session=session, run_id=run.id, sample_id=sample.id
)
for result in pending:
update_stt_result(
session=session,
result_id=result.id,
status=JobStatus.FAILED.value,
error_message=f"Failed to generate signed URL: {str(e)}",
return sample, None, str(e)

with ThreadPoolExecutor(max_workers=10) as executor:
sign_url_tasks = {
executor.submit(_generate_signed_url, sample): sample for sample in samples
}

for completed_task in as_completed(sign_url_tasks):
sample, url, error = completed_task.result()
if url:
signed_urls.append(url)
sample_keys.append(str(sample.id))
else:
failed_samples.append((sample, error))
logger.error(
f"[start_stt_evaluation_batch] Failed to generate signed URL | "
f"sample_id: {sample.id}, error: {error}"
)
session.commit()

if failed_samples:
logger.warning(
f"[start_stt_evaluation_batch] Signed URL failures | "
f"run_id: {run.id}, failed_count: {len(failed_samples)}, "
f"succeeded_count: {len(signed_urls)}"
)

if not signed_urls:
raise Exception("Failed to generate signed URLs for any audio files")
Expand Down Expand Up @@ -177,17 +183,6 @@ def start_stt_evaluation_batch(
f"[start_stt_evaluation_batch] Failed to submit batch | "
f"model: {model}, error: {str(e)}"
)
pending = get_pending_results_for_run(
session=session, run_id=run.id, provider=model
)
for result in pending:
update_stt_result(
session=session,
result_id=result.id,
status=JobStatus.FAILED.value,
error_message=f"Batch submission failed for {model}: {str(e)}",
)
session.commit()

if not batch_jobs:
raise Exception("Batch submission failed for all models")
Expand Down
Loading