From ba6fe520af3f0744b6e6b0f03ddf9337cda5a842 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Wed, 11 Feb 2026 22:35:45 +0530 Subject: [PATCH 1/7] using celery and parallel AWS signed url --- .../api/routes/stt_evaluations/evaluation.py | 37 +++--- backend/app/crud/stt_evaluations/batch.py | 71 +++++++----- .../app/services/stt_evaluations/batch_job.py | 109 ++++++++++++++++++ 3 files changed, 172 insertions(+), 45 deletions(-) create mode 100644 backend/app/services/stt_evaluations/batch_job.py diff --git a/backend/app/api/routes/stt_evaluations/evaluation.py b/backend/app/api/routes/stt_evaluations/evaluation.py index 92d7c1a5..95464ff7 100644 --- a/backend/app/api/routes/stt_evaluations/evaluation.py +++ b/backend/app/api/routes/stt_evaluations/evaluation.py @@ -2,10 +2,12 @@ import logging +from asgi_correlation_id import correlation_id from fastapi import APIRouter, Body, Depends, HTTPException, Query from app.api.deps import AuthContextDep, SessionDep from app.api.permissions import Permission, require_permission +from app.celery.utils import start_low_priority_job from app.crud.stt_evaluations import ( create_stt_run, create_stt_results, @@ -14,7 +16,6 @@ get_stt_dataset_by_id, get_stt_run_by_id, list_stt_runs, - start_stt_evaluation_batch, update_stt_run, ) from app.models.stt_evaluation import ( @@ -98,38 +99,36 @@ def start_stt_evaluation( models=run_create.models, ) + # Offload batch submission to Celery worker + trace_id = correlation_id.get() or "N/A" try: - batch_result = start_stt_evaluation_batch( - session=_session, - run=run, - samples=samples, - org_id=auth_context.organization_.id, + celery_task_id = start_low_priority_job( + function_path="app.services.stt_evaluations.batch_job.execute_batch_submission", project_id=auth_context.project_.id, + job_id=str(run.id), + trace_id=trace_id, + organization_id=auth_context.organization_.id, + dataset_id=run_create.dataset_id, ) logger.info( - f"[start_stt_evaluation] STT evaluation batch submitted | " - f"run_id: {run.id}, batch_jobs: {list(batch_result.get('batch_jobs', {}).keys())}" + f"[start_stt_evaluation] Batch submission queued | " + f"run_id: {run.id}, celery_task_id: {celery_task_id}" ) except Exception as e: logger.error( - f"[start_stt_evaluation] Batch submission failed | " + f"[start_stt_evaluation] Failed to queue batch submission | " f"run_id: {run.id}, error: {str(e)}" ) update_stt_run( session=_session, run_id=run.id, status="failed", - error_message=str(e), + error_message=f"Failed to queue batch submission: {str(e)}", + ) + raise HTTPException( + status_code=500, + detail=f"Failed to queue batch submission: {e}", ) - raise HTTPException(status_code=500, detail=f"Batch submission failed: {e}") - - # Refresh run to get updated status - run = get_stt_run_by_id( - session=_session, - run_id=run.id, - org_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - ) return APIResponse.success_response( data=STTEvaluationRunPublic( diff --git a/backend/app/crud/stt_evaluations/batch.py b/backend/app/crud/stt_evaluations/batch.py index 819485e2..479330ed 100644 --- a/backend/app/crud/stt_evaluations/batch.py +++ b/backend/app/crud/stt_evaluations/batch.py @@ -1,6 +1,7 @@ """Batch submission functions for STT evaluation processing.""" import logging +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any from sqlmodel import Session @@ -88,39 +89,57 @@ def start_stt_evaluation_batch( ) file_map = {f.id: f for f in file_records} - # Generate signed URLs for audio files (shared across all models) + # Generate signed URLs for audio files concurrently (shared across all models) signed_urls: list[str] = [] sample_keys: list[str] = [] - - for sample in samples: + failed_samples: list[tuple[STTSample, str]] = [] + + def _generate_signed_url( + sample: STTSample, + ) -> tuple[STTSample, str | None, str | None]: + """Generate a signed URL for a single sample. Thread-safe.""" + file_record = file_map.get(sample.file_id) + if not file_record: + return sample, None, f"File record not found for file_id: {sample.file_id}" try: - # Get object_store_url from file record - file_record = file_map.get(sample.file_id) - if not file_record: - raise ValueError(f"File record not found for file_id: {sample.file_id}") - - signed_url = storage.get_signed_url( + url = storage.get_signed_url( file_record.object_store_url, expires_in=signed_url_expires_in ) - signed_urls.append(signed_url) - sample_keys.append(str(sample.id)) - + return sample, url, None except Exception as e: - logger.error( - f"[start_stt_evaluation_batch] Failed to generate signed URL | " - f"sample_id: {sample.id}, error: {str(e)}" - ) - pending = get_pending_results_for_run( - session=session, run_id=run.id, sample_id=sample.id - ) - for result in pending: - update_stt_result( - session=session, - result_id=result.id, - status=JobStatus.FAILED.value, - error_message=f"Failed to generate signed URL: {str(e)}", + return sample, None, str(e) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = { + executor.submit(_generate_signed_url, sample): sample for sample in samples + } + + for future in as_completed(futures): + sample, url, error = future.result() + if url: + signed_urls.append(url) + sample_keys.append(str(sample.id)) + else: + failed_samples.append((sample, error)) + logger.error( + f"[start_stt_evaluation_batch] Failed to generate signed URL | " + f"sample_id: {sample.id}, error: {error}" ) - session.commit() + + # Mark failed samples in DB + for sample, error in failed_samples: + pending = get_pending_results_for_run( + session=session, run_id=run.id, sample_id=sample.id + ) + for result in pending: + update_stt_result( + session=session, + result_id=result.id, + status=JobStatus.FAILED.value, + error_message=f"Failed to generate signed URL: {error}", + ) + if failed_samples: + session.commit() if not signed_urls: raise Exception("Failed to generate signed URLs for any audio files") diff --git a/backend/app/services/stt_evaluations/batch_job.py b/backend/app/services/stt_evaluations/batch_job.py new file mode 100644 index 00000000..3313cd2e --- /dev/null +++ b/backend/app/services/stt_evaluations/batch_job.py @@ -0,0 +1,109 @@ +"""Celery task function for STT evaluation batch submission.""" + +import logging + +from sqlmodel import Session + +from app.core.db import engine +from app.crud.stt_evaluations.batch import start_stt_evaluation_batch +from app.crud.stt_evaluations.dataset import get_samples_by_dataset_id +from app.crud.stt_evaluations.run import get_stt_run_by_id, update_stt_run + +logger = logging.getLogger(__name__) + + +def execute_batch_submission( + project_id: int, + job_id: str, + task_id: str, + task_instance, + organization_id: int, + dataset_id: int, + **kwargs, +) -> dict: + """Execute STT evaluation batch submission in a Celery worker. + + Handles signed URL generation, JSONL creation, Gemini file upload, + and batch job creation. + + Args: + project_id: Project ID + job_id: Evaluation run ID (as string) + task_id: Celery task ID + task_instance: Celery task instance + organization_id: Organization ID + dataset_id: Dataset ID + + Returns: + dict: Result summary with batch job info + """ + run_id = int(job_id) + + logger.info( + f"[execute_batch_submission] Starting | " + f"run_id: {run_id}, project_id: {project_id}, " + f"celery_task_id: {task_id}" + ) + + with Session(engine) as session: + run = get_stt_run_by_id( + session=session, + run_id=run_id, + org_id=organization_id, + project_id=project_id, + ) + + if not run: + logger.error(f"[execute_batch_submission] Run not found | run_id: {run_id}") + return {"success": False, "error": "Run not found"} + + samples = get_samples_by_dataset_id( + session=session, + dataset_id=dataset_id, + org_id=organization_id, + project_id=project_id, + ) + + if not samples: + logger.error( + f"[execute_batch_submission] No samples found | " + f"run_id: {run_id}, dataset_id: {dataset_id}" + ) + update_stt_run( + session=session, + run_id=run_id, + status="failed", + error_message="No samples found for dataset", + ) + return {"success": False, "error": "No samples found"} + + try: + batch_result = start_stt_evaluation_batch( + session=session, + run=run, + samples=samples, + org_id=organization_id, + project_id=project_id, + ) + + logger.info( + f"[execute_batch_submission] Batch submitted | " + f"run_id: {run_id}, " + f"batch_jobs: {list(batch_result.get('batch_jobs', {}).keys())}" + ) + + return batch_result + + except Exception as e: + logger.error( + f"[execute_batch_submission] Batch submission failed | " + f"run_id: {run_id}, error: {str(e)}", + exc_info=True, + ) + update_stt_run( + session=session, + run_id=run_id, + status="failed", + error_message=str(e), + ) + return {"success": False, "error": str(e)} From b1ee3de435af9124e70f045fff99d8b40702f9e3 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Wed, 11 Feb 2026 22:52:06 +0530 Subject: [PATCH 2/7] cleanup stt_results --- .../api/routes/stt_evaluations/evaluation.py | 22 +---- backend/app/crud/stt_evaluations/batch.py | 34 +------ backend/app/crud/stt_evaluations/cron.py | 95 ++++++++++--------- 3 files changed, 55 insertions(+), 96 deletions(-) diff --git a/backend/app/api/routes/stt_evaluations/evaluation.py b/backend/app/api/routes/stt_evaluations/evaluation.py index 95464ff7..492e998f 100644 --- a/backend/app/api/routes/stt_evaluations/evaluation.py +++ b/backend/app/api/routes/stt_evaluations/evaluation.py @@ -10,9 +10,7 @@ from app.celery.utils import start_low_priority_job from app.crud.stt_evaluations import ( create_stt_run, - create_stt_results, get_results_by_run_id, - get_samples_by_dataset_id, get_stt_dataset_by_id, get_stt_run_by_id, list_stt_runs, @@ -81,25 +79,7 @@ def start_stt_evaluation( total_items=sample_count * len(run_create.models), ) - # Get samples for the dataset - samples = get_samples_by_dataset_id( - session=_session, - dataset_id=run_create.dataset_id, - org_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - ) - - # Create result records for each sample and model - create_stt_results( - session=_session, - samples=samples, - evaluation_run_id=run.id, - org_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - models=run_create.models, - ) - - # Offload batch submission to Celery worker + # Offload batch submission (signed URLs, JSONL, Gemini upload) to Celery worker trace_id = correlation_id.get() or "N/A" try: celery_task_id = start_low_priority_job( diff --git a/backend/app/crud/stt_evaluations/batch.py b/backend/app/crud/stt_evaluations/batch.py index 479330ed..5d197c14 100644 --- a/backend/app/crud/stt_evaluations/batch.py +++ b/backend/app/crud/stt_evaluations/batch.py @@ -13,13 +13,8 @@ ) from app.core.cloud.storage import get_cloud_storage from app.crud.file import get_files_by_ids -from app.crud.stt_evaluations.result import ( - get_pending_results_for_run, - update_stt_result, -) from app.crud.stt_evaluations.run import update_stt_run from app.models import EvaluationRun -from app.models.job import JobStatus from app.models.stt_evaluation import STTSample from app.services.stt_evaluations.gemini import GeminiClient @@ -126,20 +121,12 @@ def _generate_signed_url( f"sample_id: {sample.id}, error: {error}" ) - # Mark failed samples in DB - for sample, error in failed_samples: - pending = get_pending_results_for_run( - session=session, run_id=run.id, sample_id=sample.id - ) - for result in pending: - update_stt_result( - session=session, - result_id=result.id, - status=JobStatus.FAILED.value, - error_message=f"Failed to generate signed URL: {error}", - ) if failed_samples: - session.commit() + logger.warning( + f"[start_stt_evaluation_batch] Signed URL failures | " + f"run_id: {run.id}, failed_count: {len(failed_samples)}, " + f"succeeded_count: {len(signed_urls)}" + ) if not signed_urls: raise Exception("Failed to generate signed URLs for any audio files") @@ -196,17 +183,6 @@ def _generate_signed_url( f"[start_stt_evaluation_batch] Failed to submit batch | " f"model: {model}, error: {str(e)}" ) - pending = get_pending_results_for_run( - session=session, run_id=run.id, provider=model - ) - for result in pending: - update_stt_result( - session=session, - result_id=result.id, - status=JobStatus.FAILED.value, - error_message=f"Batch submission failed for {model}: {str(e)}", - ) - session.commit() if not batch_jobs: raise Exception("Batch submission failed for all models") diff --git a/backend/app/crud/stt_evaluations/cron.py b/backend/app/crud/stt_evaluations/cron.py index 13067f8c..c2e5a788 100644 --- a/backend/app/crud/stt_evaluations/cron.py +++ b/backend/app/crud/stt_evaluations/cron.py @@ -15,7 +15,8 @@ from sqlmodel import Session, select from app.core.batch import BatchJobState, GeminiBatchProvider, poll_batch_status -from app.crud.stt_evaluations.result import count_results_by_status, update_stt_result +from app.core.util import now +from app.crud.stt_evaluations.result import count_results_by_status from app.crud.stt_evaluations.run import update_stt_run from app.models import EvaluationRun from app.models.batch_job import BatchJob @@ -346,10 +347,9 @@ async def poll_stt_run( # All batch jobs are done - finalize the run status_counts = count_results_by_status(session=session, run_id=run.id) - pending = status_counts.get(JobStatus.PENDING.value, 0) failed_count = status_counts.get(JobStatus.FAILED.value, 0) - final_status = "completed" if pending == 0 else "processing" + final_status = "completed" error_message = None if any_failed: error_message = "; ".join(errors) @@ -382,7 +382,10 @@ async def process_completed_stt_batch( batch_job: Any, batch_provider: GeminiBatchProvider, ) -> None: - """Process completed Gemini batch - download results and update STT result records. + """Process completed Gemini batch - download results and create STT result records. + + Result records are created here on batch completion rather than upfront, + using the stt_sample_id embedded as the key in each batch request. Args: session: Database session @@ -395,64 +398,64 @@ async def process_completed_stt_batch( f"run_id={run.id}, batch_job_id={batch_job.id}" ) - # Get the STT provider from batch job config stt_provider = batch_job.config.get("stt_provider", "gemini-2.5-pro") - processed_count = 0 - failed_count = 0 + success_count = 0 + failure_count = 0 try: - # Download results using GeminiBatchProvider - # Keys are embedded in the JSONL response file, no separate mapping needed - results = batch_provider.download_batch_results(batch_job.provider_batch_id) + batch_responses = batch_provider.download_batch_results( + batch_job.provider_batch_id + ) logger.info( - f"[process_completed_stt_batch] Got batch results | " - f"batch_job_id={batch_job.id}, result_count={len(results)}" + f"[process_completed_stt_batch] Downloaded batch responses | " + f"batch_job_id={batch_job.id}, response_count={len(batch_responses)}" ) - # Match results to samples using key (sample_id) from batch request - for batch_result in results: - custom_id = batch_result["custom_id"] - # custom_id is the sample_id as string (set via key in batch request) + timestamp = now() + stt_result_rows: list[dict] = [] + + for response in batch_responses: + raw_sample_id = response["custom_id"] try: - sample_id = int(custom_id) + stt_sample_id = int(raw_sample_id) except (ValueError, TypeError): logger.warning( f"[process_completed_stt_batch] Invalid custom_id | " - f"batch_job_id={batch_job.id}, custom_id={custom_id}" + f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}" ) - failed_count += 1 + failure_count += 1 continue - # Find result record for this sample and provider - stmt = select(STTResult).where( - STTResult.evaluation_run_id == run.id, - STTResult.stt_sample_id == sample_id, - STTResult.provider == stt_provider, - ) - result_record = session.exec(stmt).one_or_none() + row = { + "stt_sample_id": stt_sample_id, + "evaluation_run_id": run.id, + "organization_id": run.organization_id, + "project_id": run.project_id, + "provider": stt_provider, + "inserted_at": timestamp, + "updated_at": timestamp, + } + + if response.get("response"): + row["transcription"] = response["response"].get("text", "") + row["status"] = JobStatus.SUCCESS.value + success_count += 1 + else: + row["status"] = JobStatus.FAILED.value + row["error_message"] = response.get("error", "Unknown error") + failure_count += 1 - if result_record: - if batch_result.get("response"): - text = batch_result["response"].get("text", "") - update_stt_result( - session=session, - result_id=result_record.id, - transcription=text, - status=JobStatus.SUCCESS.value, - ) - processed_count += 1 - else: - update_stt_result( - session=session, - result_id=result_record.id, - status=JobStatus.FAILED.value, - error_message=batch_result.get("error", "Unknown error"), - ) - failed_count += 1 + stt_result_rows.append(row) - session.commit() + # Bulk insert in batches of 200 + insert_batch_size = 200 + for i in range(0, len(stt_result_rows), insert_batch_size): + chunk = stt_result_rows[i : i + insert_batch_size] + session.bulk_insert_mappings(STTResult, chunk) + if stt_result_rows: + session.commit() except Exception as e: logger.error( @@ -465,5 +468,5 @@ async def process_completed_stt_batch( logger.info( f"[process_completed_stt_batch] Batch results processed | " f"run_id={run.id}, provider={stt_provider}, " - f"processed={processed_count}, failed={failed_count}" + f"success={success_count}, failed={failure_count}" ) From 04859494ca8fd27fe5d1df303c4613ca92e2702d Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Wed, 11 Feb 2026 23:47:32 +0530 Subject: [PATCH 3/7] cleanup --- backend/app/crud/stt_evaluations/cron.py | 39 +++---------------- backend/app/services/stt_evaluations/audio.py | 6 +-- 2 files changed, 7 insertions(+), 38 deletions(-) diff --git a/backend/app/crud/stt_evaluations/cron.py b/backend/app/crud/stt_evaluations/cron.py index c2e5a788..df245934 100644 --- a/backend/app/crud/stt_evaluations/cron.py +++ b/backend/app/crud/stt_evaluations/cron.py @@ -97,38 +97,11 @@ async def poll_all_pending_stt_evaluations( org_id = project_runs[0].organization_id try: - # Initialize Gemini client for this project - try: - gemini_client = GeminiClient.from_credentials( - session=session, - org_id=org_id, - project_id=project_id, - ) - except Exception as client_err: - logger.error( - f"[poll_all_pending_stt_evaluations] Failed to get Gemini client | " - f"org_id={org_id} | project_id={project_id} | error={client_err}" - ) - # Mark all runs in this project as failed - for run in project_runs: - update_stt_run( - session=session, - run_id=run.id, - status="failed", - error_message=f"Gemini client initialization failed: {str(client_err)}", - ) - all_results.append( - { - "run_id": run.id, - "run_name": run.run_name, - "type": "stt", - "action": "failed", - "error": str(client_err), - } - ) - total_failed += 1 - continue - + gemini_client = GeminiClient.from_credentials( + session=session, + org_id=org_id, + project_id=project_id, + ) batch_provider = GeminiBatchProvider(client=gemini_client.client) # Process each run in this project @@ -194,7 +167,7 @@ async def poll_all_pending_stt_evaluations( "error": f"Project processing failed: {str(e)}", } ) - total_failed += 1 + total_failed += len(project_runs) summary = { "total": len(pending_runs), diff --git a/backend/app/services/stt_evaluations/audio.py b/backend/app/services/stt_evaluations/audio.py index e0ab135e..4639bbf3 100644 --- a/backend/app/services/stt_evaluations/audio.py +++ b/backend/app/services/stt_evaluations/audio.py @@ -87,11 +87,7 @@ def upload_audio_file( try: storage = get_cloud_storage(session=session, project_id=project_id) s3_url = str(storage.put(source=file, file_path=file_path)) - - try: - size_bytes = int(storage.get_file_size_kb(s3_url) * 1024) - except Exception: - size_bytes = file.size or 0 + size_bytes = file.size or 0 original_filename = file.filename or new_filename content_type = file.content_type or f"audio/{extension}" From b262cb69ce699cf4e9161c85e77602732ce89e54 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 12 Feb 2026 00:05:28 +0530 Subject: [PATCH 4/7] cleanup endpoints --- backend/app/api/routes/stt_evaluations/result.py | 12 ------------ backend/app/crud/stt_evaluations/dataset.py | 3 ++- backend/app/services/stt_evaluations/batch_job.py | 1 + 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/backend/app/api/routes/stt_evaluations/result.py b/backend/app/api/routes/stt_evaluations/result.py index b2bc48bb..2af689dd 100644 --- a/backend/app/api/routes/stt_evaluations/result.py +++ b/backend/app/api/routes/stt_evaluations/result.py @@ -40,18 +40,6 @@ def update_result_feedback( f"result_id: {result_id}, is_correct: {feedback.is_correct}" ) - # Verify result exists and belongs to this project - existing = get_stt_result_by_id( - session=_session, - result_id=result_id, - org_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - ) - - if not existing: - raise HTTPException(status_code=404, detail="Result not found") - - # Update feedback result = update_human_feedback( session=_session, result_id=result_id, diff --git a/backend/app/crud/stt_evaluations/dataset.py b/backend/app/crud/stt_evaluations/dataset.py index 9f254b3a..78838c59 100644 --- a/backend/app/crud/stt_evaluations/dataset.py +++ b/backend/app/crud/stt_evaluations/dataset.py @@ -296,7 +296,8 @@ def get_samples_by_dataset_id( dataset_id: int, org_id: int, project_id: int, - limit: int = 100, + # removing limit for now since we need all samples for batch job, can add pagination later if needed + limit: int, offset: int = 0, ) -> list[STTSample]: """Get samples for a dataset. diff --git a/backend/app/services/stt_evaluations/batch_job.py b/backend/app/services/stt_evaluations/batch_job.py index 3313cd2e..69648dc2 100644 --- a/backend/app/services/stt_evaluations/batch_job.py +++ b/backend/app/services/stt_evaluations/batch_job.py @@ -62,6 +62,7 @@ def execute_batch_submission( dataset_id=dataset_id, org_id=organization_id, project_id=project_id, + limit=run.total_items, ) if not samples: From 6fc12342224d79ffd0017ebf605125a6d00b1590 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 12 Feb 2026 00:11:47 +0530 Subject: [PATCH 5/7] garbage collection and log formatting --- backend/app/crud/stt_evaluations/__init__.py | 2 - backend/app/crud/stt_evaluations/cron.py | 22 +-- backend/app/crud/stt_evaluations/dataset.py | 1 - backend/app/crud/stt_evaluations/result.py | 141 ------------------ backend/app/crud/stt_evaluations/run.py | 26 +--- .../app/services/stt_evaluations/dataset.py | 12 +- 6 files changed, 19 insertions(+), 185 deletions(-) diff --git a/backend/app/crud/stt_evaluations/__init__.py b/backend/app/crud/stt_evaluations/__init__.py index 71f0b3f3..7cc235e6 100644 --- a/backend/app/crud/stt_evaluations/__init__.py +++ b/backend/app/crud/stt_evaluations/__init__.py @@ -16,7 +16,6 @@ update_stt_run, ) from .result import ( - create_stt_results, get_stt_result_by_id, get_results_by_run_id, update_human_feedback, @@ -39,7 +38,6 @@ "list_stt_runs", "update_stt_run", # Result - "create_stt_results", "get_stt_result_by_id", "get_results_by_run_id", "update_human_feedback", diff --git a/backend/app/crud/stt_evaluations/cron.py b/backend/app/crud/stt_evaluations/cron.py index df245934..754148cd 100644 --- a/backend/app/crud/stt_evaluations/cron.py +++ b/backend/app/crud/stt_evaluations/cron.py @@ -125,7 +125,7 @@ async def poll_all_pending_stt_evaluations( except Exception as e: logger.error( f"[poll_all_pending_stt_evaluations] Failed to poll STT run | " - f"run_id={run.id} | {e}", + f"run_id: {run.id}, error: {e}", exc_info=True, ) update_stt_run( @@ -148,7 +148,7 @@ async def poll_all_pending_stt_evaluations( except Exception as e: logger.error( f"[poll_all_pending_stt_evaluations] Failed to process project | " - f"project_id={project_id} | {e}", + f"project_id: {project_id}, error: {e}", exc_info=True, ) for run in project_runs: @@ -179,8 +179,8 @@ async def poll_all_pending_stt_evaluations( logger.info( f"[poll_all_pending_stt_evaluations] Polling summary | " - f"processed={total_processed} | failed={total_failed} | " - f"still_processing={total_still_processing}" + f"processed: {total_processed}, failed: {total_failed}, " + f"still_processing: {total_still_processing}" ) return summary @@ -228,8 +228,10 @@ async def poll_stt_run( Returns: dict: Status result with run details and action taken """ - log_prefix = f"[org={org_id}][project={run.project_id}][eval={run.id}]" - logger.info(f"[poll_stt_run] {log_prefix} Polling run") + logger.info( + f"[poll_stt_run] Polling run | " + f"run_id: {run.id}, org_id: {org_id}, project_id: {run.project_id}" + ) previous_status = run.status @@ -237,7 +239,7 @@ async def poll_stt_run( batch_jobs = _get_batch_jobs_for_run(session=session, run=run) if not batch_jobs: - logger.warning(f"[poll_stt_run] {log_prefix} No batch jobs found") + logger.warning(f"[poll_stt_run] No batch jobs found | run_id: {run.id}") update_stt_run( session=session, run_id=run.id, @@ -284,9 +286,9 @@ async def poll_stt_run( provider_status = batch_job.provider_status logger.info( - f"[poll_stt_run] {log_prefix} Batch status | " - f"batch_job_id={batch_job.id} | provider={provider_name} | " - f"state={provider_status}" + f"[poll_stt_run] Batch status | " + f"run_id: {run.id}, batch_job_id: {batch_job.id}, " + f"provider: {provider_name}, state: {provider_status}" ) if provider_status not in TERMINAL_STATES: diff --git a/backend/app/crud/stt_evaluations/dataset.py b/backend/app/crud/stt_evaluations/dataset.py index 78838c59..dca699bb 100644 --- a/backend/app/crud/stt_evaluations/dataset.py +++ b/backend/app/crud/stt_evaluations/dataset.py @@ -16,7 +16,6 @@ STTSample, STTSampleCreate, STTDatasetPublic, - STTSamplePublic, ) logger = logging.getLogger(__name__) diff --git a/backend/app/crud/stt_evaluations/result.py b/backend/app/crud/stt_evaluations/result.py index b8045200..7094ab8f 100644 --- a/backend/app/crud/stt_evaluations/result.py +++ b/backend/app/crud/stt_evaluations/result.py @@ -1,14 +1,12 @@ """CRUD operations for STT evaluation results.""" import logging -from typing import Any from sqlmodel import Session, select, func from app.core.exception_handlers import HTTPException from app.core.util import now from app.models.file import File -from app.models.job import JobStatus from app.models.stt_evaluation import ( STTResult, STTSample, @@ -19,64 +17,6 @@ logger = logging.getLogger(__name__) -def create_stt_results( - *, - session: Session, - samples: list[STTSample], - evaluation_run_id: int, - org_id: int, - project_id: int, - models: list[str], -) -> list[STTResult]: - """Create STT result records for all samples and models. - - Creates one result per sample per model. - - Args: - session: Database session - samples: List of samples - evaluation_run_id: Run ID - org_id: Organization ID - project_id: Project ID - models: List of STT models - - Returns: - list[STTResult]: Created results - """ - logger.info( - f"[create_stt_results] Creating STT results | " - f"run_id: {evaluation_run_id}, sample_count: {len(samples)}, " - f"model_count: {len(models)}" - ) - - timestamp = now() - results = [ - STTResult( - stt_sample_id=sample.id, - evaluation_run_id=evaluation_run_id, - organization_id=org_id, - project_id=project_id, - provider=model, - status=JobStatus.PENDING.value, - inserted_at=timestamp, - updated_at=timestamp, - ) - for sample in samples - for model in models - ] - - session.add_all(results) - session.flush() - session.commit() - - logger.info( - f"[create_stt_results] STT results created | " - f"run_id: {evaluation_run_id}, result_count: {len(results)}" - ) - - return results - - def get_stt_result_by_id( *, session: Session, @@ -196,53 +136,6 @@ def get_results_by_run_id( return results, total -def update_stt_result( - *, - session: Session, - result_id: int, - transcription: str | None = None, - status: str | None = None, - score: dict[str, Any] | None = None, - error_message: str | None = None, -) -> STTResult | None: - """Update an STT result with transcription data. - - Args: - session: Database session - result_id: Result ID - transcription: Generated transcription - status: New status - score: Evaluation metrics (e.g., wer, cer) - error_message: Error message if failed - - Returns: - STTResult | None: Updated result - """ - statement = select(STTResult).where(STTResult.id == result_id) - result = session.exec(statement).one_or_none() - - if not result: - return None - - updates = { - "transcription": transcription, - "status": status, - "score": score, - "error_message": error_message, - } - - for field, value in updates.items(): - if value is not None: - setattr(result, field, value) - - result.updated_at = now() - - session.add(result) - session.flush() - - return result - - def update_human_feedback( *, session: Session, @@ -298,40 +191,6 @@ def update_human_feedback( return result -def get_pending_results_for_run( - *, - session: Session, - run_id: int, - provider: str | None = None, - sample_id: int | None = None, -) -> list[STTResult]: - """Get all pending results for a run. - - Args: - session: Database session - run_id: Run ID - provider: Optional filter by provider - sample_id: Optional filter by sample ID - - Returns: - list[STTResult]: Pending results - """ - where_clauses = [ - STTResult.evaluation_run_id == run_id, - STTResult.status == JobStatus.PENDING.value, - ] - - if provider is not None: - where_clauses.append(STTResult.provider == provider) - - if sample_id is not None: - where_clauses.append(STTResult.stt_sample_id == sample_id) - - statement = select(STTResult).where(*where_clauses) - - return list(session.exec(statement).all()) - - def count_results_by_status( *, session: Session, diff --git a/backend/app/crud/stt_evaluations/run.py b/backend/app/crud/stt_evaluations/run.py index 5d7732cd..b7516f9a 100644 --- a/backend/app/crud/stt_evaluations/run.py +++ b/backend/app/crud/stt_evaluations/run.py @@ -6,7 +6,7 @@ from sqlmodel import Session, select, func from app.core.util import now -from app.models import EvaluationDataset, EvaluationRun +from app.models import EvaluationRun from app.models.stt_evaluation import ( EvaluationType, STTEvaluationRunPublic, @@ -230,27 +230,3 @@ def update_stt_run( ) return run - - -def get_pending_stt_runs( - *, - session: Session, -) -> list[EvaluationRun]: - """Get all pending STT evaluation runs that are ready for polling. - - Only returns runs with status "processing" that have a batch_job_id set, - meaning the batch has been submitted and is ready to be polled. - - Args: - session: Database session - - Returns: - list[EvaluationRun]: Pending runs ready for polling - """ - statement = select(EvaluationRun).where( - EvaluationRun.type == EvaluationType.STT.value, - EvaluationRun.status == "processing", - EvaluationRun.batch_job_id.is_not(None), - ) - - return list(session.exec(statement).all()) diff --git a/backend/app/services/stt_evaluations/dataset.py b/backend/app/services/stt_evaluations/dataset.py index fa7836c8..0307e998 100644 --- a/backend/app/services/stt_evaluations/dataset.py +++ b/backend/app/services/stt_evaluations/dataset.py @@ -52,9 +52,9 @@ def upload_stt_dataset( Tuple of (created dataset, created samples) """ logger.info( - f"[upload_stt_dataset] Uploading STT dataset | name={name} | " - f"sample_count={len(samples)} | org_id={organization_id} | " - f"project_id={project_id}" + f"[upload_stt_dataset] Uploading STT dataset | " + f"name: {name}, sample_count: {len(samples)}, " + f"org_id: {organization_id}, project_id: {project_id}" ) # Step 1: Convert samples to CSV and upload to object store @@ -86,7 +86,7 @@ def upload_stt_dataset( logger.info( f"[upload_stt_dataset] Created dataset record | " - f"id={dataset.id} | name={name}" + f"id: {dataset.id}, name: {name}" ) # Step 4: Create sample records @@ -98,7 +98,7 @@ def upload_stt_dataset( logger.info( f"[upload_stt_dataset] Created sample records | " - f"dataset_id={dataset.id} | sample_count={len(created_samples)}" + f"dataset_id: {dataset.id}, sample_count: {len(created_samples)}" ) session.commit() @@ -147,7 +147,7 @@ def _upload_samples_to_object_store( if object_store_url: logger.info( f"[_upload_samples_to_object_store] Upload successful | " - f"url={object_store_url}" + f"url: {object_store_url}" ) else: logger.info( From 23df9eafdbed2687a43c7ecb5960f51b216213b0 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 12 Feb 2026 00:26:30 +0530 Subject: [PATCH 6/7] update testcase --- .../app/tests/api/routes/test_stt_evaluation.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/backend/app/tests/api/routes/test_stt_evaluation.py b/backend/app/tests/api/routes/test_stt_evaluation.py index 5dbcd22c..d0b07044 100644 --- a/backend/app/tests/api/routes/test_stt_evaluation.py +++ b/backend/app/tests/api/routes/test_stt_evaluation.py @@ -542,10 +542,10 @@ def test_dataset_with_samples( ) return dataset - @patch("app.api.routes.stt_evaluations.evaluation.start_stt_evaluation_batch") + @patch("app.api.routes.stt_evaluations.evaluation.start_low_priority_job") def test_start_stt_evaluation_success( self, - mock_start_batch, + mock_start_job, client: TestClient, user_api_key_header: dict[str, str], db: Session, @@ -554,12 +554,7 @@ def test_start_stt_evaluation_success( ) -> None: """Test successfully starting an STT evaluation run.""" dataset = test_dataset_with_samples - mock_start_batch.return_value = { - "success": True, - "run_id": 1, - "batch_jobs": {"gemini-2.5-pro": {"batch_job_id": 1}}, - "sample_count": 3, - } + mock_start_job.return_value = "mock-celery-task-id" response = client.post( "/api/v1/evaluations/stt/runs", @@ -582,12 +577,12 @@ def test_start_stt_evaluation_success( assert data["type"] == "stt" assert data["models"] == ["gemini-2.5-pro"] assert data["total_items"] == 3 # 3 samples × 1 model - assert data["status"] in ("pending", "processing") + assert data["status"] == "pending" assert data["organization_id"] == user_api_key.organization_id assert data["project_id"] == user_api_key.project_id assert data["error_message"] is None - mock_start_batch.assert_called_once() + mock_start_job.assert_called_once() def test_start_stt_evaluation_invalid_dataset( self, From a035718f7a047f0035d7d6185a0bb42655d05272 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 12 Feb 2026 22:29:19 +0530 Subject: [PATCH 7/7] cleanup code --- backend/app/crud/stt_evaluations/batch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/crud/stt_evaluations/batch.py b/backend/app/crud/stt_evaluations/batch.py index 5d197c14..2c598dd8 100644 --- a/backend/app/crud/stt_evaluations/batch.py +++ b/backend/app/crud/stt_evaluations/batch.py @@ -105,12 +105,12 @@ def _generate_signed_url( return sample, None, str(e) with ThreadPoolExecutor(max_workers=10) as executor: - futures = { + sign_url_tasks = { executor.submit(_generate_signed_url, sample): sample for sample in samples } - for future in as_completed(futures): - sample, url, error = future.result() + for completed_task in as_completed(sign_url_tasks): + sample, url, error = completed_task.result() if url: signed_urls.append(url) sample_keys.append(str(sample.id))