Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions backend/app/alembic/versions/043_add_project_org_to_job_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Add project_id and organization_id to job table

Revision ID: 043
Revises: 042
Create Date: 2026-02-04 14:39:00.000000

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "043"
down_revision = "042"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Add organization_id column
op.add_column(
"job",
sa.Column(
"organization_id",
sa.Integer(),
nullable=True,
comment="Reference to the organization",
),
)

# Add project_id column
op.add_column(
"job",
sa.Column(
"project_id",
sa.Integer(),
nullable=True,
comment="Reference to the project",
),
)

# Create foreign key constraints
op.create_foreign_key(
"fk_job_organization_id",
"job",
"organization",
["organization_id"],
["id"],
ondelete="CASCADE",
)

op.create_foreign_key(
"fk_job_project_id",
"job",
"project",
["project_id"],
["id"],
ondelete="CASCADE",
)

# Create indexes
op.create_index(
"ix_job_organization_id",
"job",
["organization_id"],
unique=False,
)

op.create_index(
"ix_job_project_id",
"job",
["project_id"],
unique=False,
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Drop indexes
op.drop_index("ix_job_project_id", table_name="job")
op.drop_index("ix_job_organization_id", table_name="job")

# Drop foreign key constraints
op.drop_constraint("fk_job_project_id", "job", type_="foreignkey")
op.drop_constraint("fk_job_organization_id", "job", type_="foreignkey")

# Drop columns
op.drop_column("job", "project_id")
op.drop_column("job", "organization_id")
# ### end Alembic commands ###
34 changes: 34 additions & 0 deletions backend/app/alembic/versions/044_optimize_conversation_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add composite index for conversation query optimization

Revision ID: 044
Revises: 043
Create Date: 2026-02-04 15:24:00.000000

"""
from alembic import op


# revision identifiers, used by Alembic.
revision = "044"
down_revision = "043"
branch_labels = None
depends_on = None


def upgrade() -> None:
# Create composite index to optimize the get_conversation_by_ancestor_id query
# This query filters by: ancestor_response_id, project_id, is_deleted
# and orders by: inserted_at DESC
op.create_index(
"ix_openai_conversation_ancestor_project_active_time",
"openai_conversation",
["ancestor_response_id", "project_id", "is_deleted", "inserted_at"],
unique=False,
)


def downgrade() -> None:
op.drop_index(
"ix_openai_conversation_ancestor_project_active_time",
table_name="openai_conversation",
)
6 changes: 6 additions & 0 deletions backend/app/api/docs/documents/upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ Upload a document to Kaapi.
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
- If a callback URL is provided, you will receive a notification at that URL once the document transformation job is completed.

### File Size Restrictions

- **Maximum file size**: 512MB (configurable via `MAX_DOCUMENT_UPLOAD_SIZE_MB` environment variable)
- Files exceeding the size limit will be rejected with a 413 (Payload Too Large) error
- Empty files will be rejected with a 422 (Unprocessable Entity) error

### Supported Transformations

The following (source_format → target_format) transformations are supported:
Expand Down
4 changes: 4 additions & 0 deletions backend/app/api/routes/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
build_document_schema,
build_document_schemas,
)
from app.services.documents.validators import validate_document_file
from app.utils import (
APIResponse,
get_openai_client,
Expand Down Expand Up @@ -123,6 +124,9 @@ async def upload_doc(
if callback_url:
validate_callback_url(callback_url)

# Validate file size before uploading to S3
await validate_document_file(src)

source_format, actual_transformer = pre_transform_validation(
src_filename=src.filename,
target_format=target_format,
Expand Down
3 changes: 3 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def AWS_S3_BUCKET(self) -> str:
CALLBACK_CONNECT_TIMEOUT: int = 3
CALLBACK_READ_TIMEOUT: int = 10

# Document upload size limit (in MB)
MAX_DOCUMENT_UPLOAD_SIZE_MB: int = 512

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
10 changes: 9 additions & 1 deletion backend/app/crud/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ class JobCrud:
def __init__(self, session: Session):
self.session = session

def create(self, job_type: JobType, trace_id: str | None = None) -> Job:
def create(
self,
job_type: JobType,
project_id: int,
organization_id: int,
trace_id: str | None = None,
) -> Job:
new_job = Job(
job_type=job_type,
project_id=project_id,
organization_id=organization_id,
trace_id=trace_id,
)
self.session.add(new_job)
Expand Down
27 changes: 26 additions & 1 deletion backend/app/models/job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Optional
from uuid import UUID, uuid4

from sqlmodel import Field, SQLModel
from sqlmodel import Field, Relationship, SQLModel

from app.core.util import now

if TYPE_CHECKING:
from .organization import Organization
from .project import Project


class JobStatus(str, Enum):
PENDING = "PENDING"
Expand Down Expand Up @@ -58,6 +63,22 @@ class Job(SQLModel, table=True):
},
)

# Foreign keys
organization_id: int = Field(
foreign_key="organization.id",
nullable=False,
ondelete="CASCADE",
index=True,
sa_column_kwargs={"comment": "Reference to the organization"},
)
project_id: int = Field(
foreign_key="project.id",
nullable=False,
ondelete="CASCADE",
index=True,
sa_column_kwargs={"comment": "Reference to the project"},
)

# Timestamps
created_at: datetime = Field(
default_factory=now,
Expand All @@ -68,6 +89,10 @@ class Job(SQLModel, table=True):
sa_column_kwargs={"comment": "Timestamp when the job was last updated"},
)

# Relationships
organization: Optional["Organization"] = Relationship()
project: Optional["Project"] = Relationship()


class JobUpdate(SQLModel):
status: JobStatus | None = None
Expand Down
49 changes: 49 additions & 0 deletions backend/app/services/documents/validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Validation utilities for document uploads."""

import logging
from pathlib import Path

from fastapi import HTTPException, UploadFile

from app.core.config import settings
from app.utils import mask_string

logger = logging.getLogger(__name__)

# Maximum file size for document uploads (in bytes)
# Default: 512 MB, configurable via settings
MAX_DOCUMENT_SIZE = settings.MAX_DOCUMENT_UPLOAD_SIZE_MB * 1024 * 1024


async def validate_document_file(file: UploadFile) -> int:
"""
Validate document file size.

Args:
file: The uploaded file

Returns:
File size in bytes if valid
"""

# Get file size by seeking to end
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)

if file_size > MAX_DOCUMENT_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size: {MAX_DOCUMENT_SIZE / (1024 * 1024):.0f}MB",
)

if file_size == 0:
raise HTTPException(
status_code=422,
detail="Empty file uploaded"
)

logger.info(
f"[validate_document_file] Document file validated: {mask_string(file.filename)} ({file_size} bytes)"
)
return file_size
7 changes: 6 additions & 1 deletion backend/app/services/llm/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ def start_job(
"""Create an LLM job and schedule Celery task."""
trace_id = correlation_id.get() or "N/A"
job_crud = JobCrud(session=db)
job = job_crud.create(job_type=JobType.LLM_API, trace_id=trace_id)
job = job_crud.create(
job_type=JobType.LLM_API,
project_id=project_id,
organization_id=organization_id,
trace_id=trace_id,
)

try:
task_id = start_high_priority_job(
Expand Down
7 changes: 6 additions & 1 deletion backend/app/services/response/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ def start_job(
"""Create a response job and schedule Celery task."""
trace_id = correlation_id.get() or "N/A"
job_crud = JobCrud(session=db)
job = job_crud.create(job_type=JobType.RESPONSE, trace_id=trace_id)
job = job_crud.create(
job_type=JobType.RESPONSE,
project_id=project_id,
organization_id=organization_id,
trace_id=trace_id,
)

try:
task_id = start_high_priority_job(
Expand Down
Loading