From 5beb0263d9e70844978ffcf763587cd98f4ac14e Mon Sep 17 00:00:00 2001 From: Datata1 <> Date: Tue, 10 Feb 2026 01:30:26 +0100 Subject: [PATCH 1/2] feat(pipeline-stream): add log stream --- examples/create_workspace_with_landscape.py | 121 ++++-- .../resources/workspace/__init__.py | 5 + .../resources/workspace/landscape/__init__.py | 10 + .../resources/workspace/landscape/models.py | 165 +++++++- .../workspace/landscape/operations.py | 26 ++ .../resources/workspace/landscape/schemas.py | 55 ++- .../resources/workspace/logs/__init__.py | 9 + .../resources/workspace/logs/models.py | 361 ++++++++++++++++++ .../resources/workspace/logs/schemas.py | 36 ++ src/codesphere/resources/workspace/schemas.py | 21 + .../workspace/landscape/test_pipeline.py | 245 ++++++++++++ tests/resources/workspace/logs/__init__.py | 1 + tests/resources/workspace/logs/test_logs.py | 272 +++++++++++++ 13 files changed, 1283 insertions(+), 44 deletions(-) create mode 100644 src/codesphere/resources/workspace/logs/__init__.py create mode 100644 src/codesphere/resources/workspace/logs/models.py create mode 100644 src/codesphere/resources/workspace/logs/schemas.py create mode 100644 tests/resources/workspace/landscape/test_pipeline.py create mode 100644 tests/resources/workspace/logs/__init__.py create mode 100644 tests/resources/workspace/logs/test_logs.py diff --git a/examples/create_workspace_with_landscape.py b/examples/create_workspace_with_landscape.py index a85cf28..fd8553c 100644 --- a/examples/create_workspace_with_landscape.py +++ b/examples/create_workspace_with_landscape.py @@ -1,61 +1,98 @@ +""" +Demo: Create a workspace, deploy a landscape profile, and stream logs. +""" + import asyncio import time from codesphere import CodesphereSDK from codesphere.resources.workspace import WorkspaceCreate -from codesphere.resources.workspace.landscape import ProfileBuilder, ProfileConfig +from codesphere.resources.workspace.landscape import ( + PipelineStage, + PipelineState, + ProfileBuilder, +) +from codesphere.resources.workspace.logs import LogStage -TEAM_ID = 123 # Replace with your actual team ID +TEAM_ID = 35698 -async def get_plan_id(sdk: CodesphereSDK, plan_name: str = "Micro") -> int: - plans = await sdk.metadata.list_plans() - plan = next((p for p in plans if p.title == plan_name and not p.deprecated), None) - if not plan: - raise ValueError(f"Plan '{plan_name}' not found") - return plan.id +async def main(): + async with CodesphereSDK() as sdk: + plans = await sdk.metadata.list_plans() + plan = next((p for p in plans if p.title == "Micro" and not p.deprecated), None) + if not plan: + raise ValueError("Micro plan not found") + workspace_name = f"pipeline-demo-{int(time.time())}" + print(f"Creating workspace '{workspace_name}'...") -def build_web_profile(plan_id: int) -> ProfileConfig: - """Build a simple web service landscape profile.""" - return ( - ProfileBuilder() - .prepare() - .add_step("npm install", name="Install dependencies") - .done() - .add_reactive_service("web") - .plan(plan_id) - .add_step("npm start") - .add_port(3000, public=True) - .add_path("/", port=3000) - .replicas(1) - .env("NODE_ENV", "production") - .build() - ) + workspace = await sdk.workspaces.create( + WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name) + ) + print(f"✓ Workspace created (ID: {workspace.id})") + print("Waiting for workspace to start...") + await workspace.wait_until_running(timeout=300.0, poll_interval=5.0) + print("✓ Workspace is running\n") -async def create_workspace(sdk: CodesphereSDK, plan_id: int, name: str): - workspace = await sdk.workspaces.create( - WorkspaceCreate(plan_id=plan_id, team_id=TEAM_ID, name=name) - ) - await workspace.wait_until_running(timeout=300.0, poll_interval=5.0) - return workspace + profile = ( + ProfileBuilder() + .prepare() + .add_step("echo 'Installing dependencies...' && sleep 2") + .add_step("echo 'Setup complete!' && sleep 1") + .done() + .add_reactive_service("web") + .plan(plan.id) + .add_step( + 'for i in $(seq 1 50); do echo "[$i] Processing request..."; sleep 1; done' + ) + .add_port(3000, public=True) + .add_path("/", port=3000) + .replicas(1) + .done() + .build() + ) + print("Deploying landscape profile...") + await workspace.landscape.save_profile("production", profile) + await workspace.landscape.deploy(profile="production") + print("✓ Profile deployed\n") -async def deploy_landscape(workspace, profile: dict, profile_name: str = "production"): - await workspace.landscape.save_profile(profile_name, profile) - await workspace.landscape.deploy(profile=profile_name) - print("Deployment started!") + print("--- Prepare Stage ---") + await workspace.landscape.start_stage( + PipelineStage.PREPARE, profile="production" + ) + prepare_status = await workspace.landscape.wait_for_stage( + PipelineStage.PREPARE, timeout=60.0 + ) + for status in prepare_status: + icon = "✓" if status.state == PipelineState.SUCCESS else "✗" + print(f"{icon} {status.server}: {status.state.value}") -async def main(): - async with CodesphereSDK() as sdk: - plan_id = await get_plan_id(sdk) - workspace = await create_workspace( - sdk, plan_id, f"landscape-demo-{int(time.time())}" - ) - profile = build_web_profile(plan_id) - await deploy_landscape(workspace, profile) + print("\nPrepare logs:") + for step in range(len(prepare_status[0].steps)): + logs = await workspace.logs.collect( + stage=LogStage.PREPARE, step=step, timeout=5.0 + ) + for entry in logs: + if entry.get_text(): + print(f" {entry.get_text().strip()}") + + print("\n--- Run Stage ---") + await workspace.landscape.start_stage(PipelineStage.RUN, profile="production") + print("Started run stage, waiting for logs...\n") + + print("Streaming logs from 'web' service:") + count = 0 + async for entry in workspace.logs.stream_server(step=0, server="web"): + if entry.get_text(): + print(f" {entry.get_text().strip()}") + count += 1 + + print(f"\n✓ Stream ended ({count} log entries)") + print(f"✓ Workspace {workspace.id} is still running.") if __name__ == "__main__": diff --git a/src/codesphere/resources/workspace/__init__.py b/src/codesphere/resources/workspace/__init__.py index d80e4f1..9c74ae4 100644 --- a/src/codesphere/resources/workspace/__init__.py +++ b/src/codesphere/resources/workspace/__init__.py @@ -1,4 +1,5 @@ from .git import GitHead, WorkspaceGitManager +from .logs import LogEntry, LogProblem, LogStage, WorkspaceLogManager from .resources import WorkspacesResource from .schemas import ( CommandInput, @@ -19,4 +20,8 @@ "CommandOutput", "WorkspaceGitManager", "GitHead", + "WorkspaceLogManager", + "LogEntry", + "LogProblem", + "LogStage", ] diff --git a/src/codesphere/resources/workspace/landscape/__init__.py b/src/codesphere/resources/workspace/landscape/__init__.py index 67950be..4facd41 100644 --- a/src/codesphere/resources/workspace/landscape/__init__.py +++ b/src/codesphere/resources/workspace/landscape/__init__.py @@ -4,6 +4,10 @@ ManagedServiceConfig, NetworkConfig, PathConfig, + PipelineStage, + PipelineState, + PipelineStatus, + PipelineStatusList, PortConfig, Profile, ProfileBuilder, @@ -12,6 +16,7 @@ ReactiveServiceConfig, StageConfig, Step, + StepStatus, ) __all__ = [ @@ -28,4 +33,9 @@ "NetworkConfig", "PortConfig", "PathConfig", + "PipelineStage", + "PipelineState", + "PipelineStatus", + "PipelineStatusList", + "StepStatus", ] diff --git a/src/codesphere/resources/workspace/landscape/models.py b/src/codesphere/resources/workspace/landscape/models.py index 8d6fc11..6458964 100644 --- a/src/codesphere/resources/workspace/landscape/models.py +++ b/src/codesphere/resources/workspace/landscape/models.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging import re from typing import TYPE_CHECKING, Dict, List, Optional, Union @@ -10,10 +11,20 @@ from .operations import ( _DEPLOY_OP, _DEPLOY_WITH_PROFILE_OP, + _GET_PIPELINE_STATUS_OP, _SCALE_OP, + _START_PIPELINE_STAGE_OP, + _START_PIPELINE_STAGE_WITH_PROFILE_OP, + _STOP_PIPELINE_STAGE_OP, _TEARDOWN_OP, ) -from .schemas import Profile, ProfileConfig +from .schemas import ( + PipelineStage, + PipelineState, + PipelineStatusList, + Profile, + ProfileConfig, +) if TYPE_CHECKING: from ..schemas import CommandOutput @@ -95,3 +106,155 @@ async def teardown(self) -> None: async def scale(self, services: Dict[str, int]) -> None: await self._execute_operation(_SCALE_OP, data=services) + + # Pipeline operations + + async def start_stage( + self, + stage: Union[PipelineStage, str], + profile: Optional[str] = None, + ) -> None: + """Start a pipeline stage. + + Args: + stage: The pipeline stage to start ('prepare', 'test', or 'run'). + profile: Optional profile name. If provided, starts the stage with + that profile. Required for first run after deploy. + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + NotFoundError: If the workspace is not found. + """ + if isinstance(stage, PipelineStage): + stage = stage.value + + if profile is not None: + _validate_profile_name(profile) + await self._execute_operation( + _START_PIPELINE_STAGE_WITH_PROFILE_OP, stage=stage, profile=profile + ) + else: + await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage) + + async def stop_stage(self, stage: Union[PipelineStage, str]) -> None: + """Stop a pipeline stage. + + Args: + stage: The pipeline stage to stop ('prepare', 'test', or 'run'). + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + NotFoundError: If the workspace is not found. + """ + if isinstance(stage, PipelineStage): + stage = stage.value + + await self._execute_operation(_STOP_PIPELINE_STAGE_OP, stage=stage) + + async def get_stage_status( + self, stage: Union[PipelineStage, str] + ) -> PipelineStatusList: + """Get the status of a pipeline stage. + + Args: + stage: The pipeline stage to get status for ('prepare', 'test', or 'run'). + + Returns: + List of PipelineStatus objects, one per replica/server. + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + NotFoundError: If the workspace is not found. + """ + if isinstance(stage, PipelineStage): + stage = stage.value + + return await self._execute_operation(_GET_PIPELINE_STATUS_OP, stage=stage) + + async def wait_for_stage( + self, + stage: Union[PipelineStage, str], + *, + timeout: float = 300.0, + poll_interval: float = 5.0, + server: Optional[str] = None, + ) -> PipelineStatusList: + """Wait for a pipeline stage to complete (success or failure). + + Args: + stage: The pipeline stage to wait for. + timeout: Maximum time to wait in seconds (default: 300). + poll_interval: Time between status checks in seconds (default: 5). + server: Optional server name to filter by. If None, waits for all + servers that have steps defined for this stage. + + Returns: + Final PipelineStatusList when stage completes. + + Raises: + TimeoutError: If the stage doesn't complete within the timeout. + ValidationError: If the workspace is not running. + """ + if poll_interval <= 0: + raise ValueError("poll_interval must be greater than 0") + + stage_name = stage.value if isinstance(stage, PipelineStage) else stage + elapsed = 0.0 + + while elapsed < timeout: + status_list = await self.get_stage_status(stage) + + # Filter to relevant servers for THIS stage + # A server is relevant for this stage if: + # - It has steps defined (meaning it participates in this stage) + # - OR it's not in 'waiting' state (meaning it has started) + relevant_statuses = [] + for s in status_list: + if server is not None: + # Filter by specific server + if s.server == server: + relevant_statuses.append(s) + else: + # Include servers that have steps for this stage + # Servers with no steps and waiting state don't participate in this stage + if s.steps: + relevant_statuses.append(s) + elif s.state != PipelineState.WAITING: + # Started but no steps visible yet + relevant_statuses.append(s) + + # If no relevant statuses yet, keep waiting + if not relevant_statuses: + log.debug( + "Pipeline stage '%s': no servers with steps yet, waiting...", + stage_name, + ) + await asyncio.sleep(poll_interval) + elapsed += poll_interval + continue + + # Check if all relevant servers have completed + all_completed = all( + s.state + in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED) + for s in relevant_statuses + ) + + if all_completed: + log.debug("Pipeline stage '%s' completed.", stage_name) + return PipelineStatusList(root=relevant_statuses) + + # Log current state + states = [f"{s.server}={s.state.value}" for s in relevant_statuses] + log.debug( + "Pipeline stage '%s' status: %s (elapsed: %.1fs)", + stage_name, + ", ".join(states), + elapsed, + ) + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + raise TimeoutError( + f"Pipeline stage '{stage_name}' did not complete within {timeout} seconds." + ) diff --git a/src/codesphere/resources/workspace/landscape/operations.py b/src/codesphere/resources/workspace/landscape/operations.py index 9197b09..be2e825 100644 --- a/src/codesphere/resources/workspace/landscape/operations.py +++ b/src/codesphere/resources/workspace/landscape/operations.py @@ -1,4 +1,5 @@ from ....core.operations import APIOperation +from .schemas import PipelineStatusList _DEPLOY_OP = APIOperation( method="POST", @@ -23,3 +24,28 @@ endpoint_template="/workspaces/{id}/landscape/scale", response_model=type(None), ) + +# Pipeline operations +_START_PIPELINE_STAGE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/start", + response_model=type(None), +) + +_START_PIPELINE_STAGE_WITH_PROFILE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/start/{profile}", + response_model=type(None), +) + +_STOP_PIPELINE_STAGE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/stop", + response_model=type(None), +) + +_GET_PIPELINE_STATUS_OP = APIOperation( + method="GET", + endpoint_template="/workspaces/{id}/pipeline/{stage}", + response_model=PipelineStatusList, +) diff --git a/src/codesphere/resources/workspace/landscape/schemas.py b/src/codesphere/resources/workspace/landscape/schemas.py index 621b719..ec70f95 100644 --- a/src/codesphere/resources/workspace/landscape/schemas.py +++ b/src/codesphere/resources/workspace/landscape/schemas.py @@ -1,13 +1,66 @@ from __future__ import annotations +from enum import Enum from typing import Any, Dict, List, Literal, Optional import yaml -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, RootModel from ....core.base import CamelModel +class PipelineStage(str, Enum): + """Pipeline stage for operations.""" + + PREPARE = "prepare" + TEST = "test" + RUN = "run" + + +class PipelineState(str, Enum): + """State of a pipeline stage or step.""" + + WAITING = "waiting" + RUNNING = "running" + SUCCESS = "success" + FAILURE = "failure" + ABORTED = "aborted" + + +class StepStatus(CamelModel): + """Status of a single pipeline step.""" + + state: PipelineState + started_at: Optional[str] = None + finished_at: Optional[str] = None + + +class PipelineStatus(CamelModel): + """Status of a pipeline stage execution.""" + + state: PipelineState + started_at: Optional[str] = None + finished_at: Optional[str] = None + steps: List[StepStatus] = Field(default_factory=list) + replica: str + server: str + + +class PipelineStatusList(RootModel[List[PipelineStatus]]): + """List of pipeline status entries (one per replica/server).""" + + root: List[PipelineStatus] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, item): + return self.root[item] + + def __len__(self): + return len(self.root) + + class Profile(BaseModel): name: str diff --git a/src/codesphere/resources/workspace/logs/__init__.py b/src/codesphere/resources/workspace/logs/__init__.py new file mode 100644 index 0000000..8400cb8 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/__init__.py @@ -0,0 +1,9 @@ +from .models import WorkspaceLogManager +from .schemas import LogEntry, LogProblem, LogStage + +__all__ = [ + "WorkspaceLogManager", + "LogEntry", + "LogProblem", + "LogStage", +] diff --git a/src/codesphere/resources/workspace/logs/models.py b/src/codesphere/resources/workspace/logs/models.py new file mode 100644 index 0000000..5d09e71 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/models.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +import asyncio +import json +import logging +from typing import AsyncIterator, Optional, Union + +import httpx + +from ....exceptions import APIError, ValidationError +from ....http_client import APIHttpClient +from .schemas import LogEntry, LogProblem, LogStage + +log = logging.getLogger(__name__) + + +class WorkspaceLogManager: + """Manager for streaming workspace logs via SSE. + + Provides async iterators for streaming logs from different pipeline stages + and Multi Server Deployment servers/replicas. + + Example: + ```python + # Stream prepare stage logs (with timeout for completed stages) + async for entry in workspace.logs.stream(stage=LogStage.PREPARE, step=1, timeout=30.0): + print(entry.message) + + # Stream run logs for a specific server (no timeout for live streams) + async for entry in workspace.logs.stream_server(step=1, server="web"): + print(entry.message) + + # Collect all logs at once + logs = await workspace.logs.collect(stage=LogStage.TEST, step=1) + for entry in logs: + print(entry.message) + ``` + """ + + def __init__(self, http_client: APIHttpClient, workspace_id: int): + self._http_client = http_client + self._workspace_id = workspace_id + + async def stream( + self, + stage: Union[LogStage, str], + step: int, + timeout: Optional[float] = 30.0, + ) -> AsyncIterator[LogEntry]: + """Stream logs for a given stage and step. + + Args: + stage: The pipeline stage ('prepare', 'test', or 'run'). + For 'run' stage of Multi Server Deployments, use + stream_server() or stream_replica() instead. + step: The step number within the stage. + timeout: Maximum seconds to wait for the stream. Default 30s. + Use None for no timeout (for live/running stages). + + Yields: + LogEntry objects as they arrive from the SSE stream. + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + APIError: For other API errors. + asyncio.TimeoutError: If timeout is reached. + """ + if isinstance(stage, LogStage): + stage = stage.value + + endpoint = f"/workspaces/{self._workspace_id}/logs/{stage}/{step}" + async for entry in self._stream_logs(endpoint, timeout=timeout): + yield entry + + async def stream_server( + self, + step: int, + server: str, + timeout: Optional[float] = None, + ) -> AsyncIterator[LogEntry]: + """Stream run logs for a specific server in a Multi Server Deployment. + + Args: + step: The step number. + server: The server name. + timeout: Maximum seconds to wait. Default None (no timeout). + + Yields: + LogEntry objects as they arrive from the SSE stream. + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + APIError: For other API errors. + """ + endpoint = f"/workspaces/{self._workspace_id}/logs/run/{step}/server/{server}" + async for entry in self._stream_logs(endpoint, timeout=timeout): + yield entry + + async def stream_replica( + self, + step: int, + replica: str, + timeout: Optional[float] = None, + ) -> AsyncIterator[LogEntry]: + """Stream run logs for a specific replica in a Multi Server Deployment. + + Args: + step: The step number. + replica: The replica identifier. + timeout: Maximum seconds to wait. Default None (no timeout). + + Yields: + LogEntry objects as they arrive from the SSE stream. + + Raises: + ValidationError: If the workspace is not running or parameters are invalid. + APIError: For other API errors. + """ + endpoint = f"/workspaces/{self._workspace_id}/logs/run/{step}/replica/{replica}" + async for entry in self._stream_logs(endpoint, timeout=timeout): + yield entry + + async def collect( + self, + stage: Union[LogStage, str], + step: int, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a stage and step into a list. + + Args: + stage: The pipeline stage ('prepare', 'test', or 'run'). + step: The step number within the stage. + max_entries: Maximum number of entries to collect (None for unlimited). + timeout: Maximum seconds to collect. Default 30s. + + Returns: + List of LogEntry objects. + """ + entries: list[LogEntry] = [] + count = 0 + try: + async for entry in self.stream(stage, step, timeout=timeout): + entries.append(entry) + count += 1 + if max_entries and count >= max_entries: + break + except asyncio.TimeoutError: + log.debug(f"Log collection timed out after {count} entries") + return entries + + async def collect_server( + self, + step: int, + server: str, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a server into a list. + + Args: + step: The step number. + server: The server name. + max_entries: Maximum number of entries to collect (None for unlimited). + timeout: Maximum seconds to collect. Default 30s. + + Returns: + List of LogEntry objects. + """ + entries: list[LogEntry] = [] + count = 0 + try: + async for entry in self.stream_server(step, server, timeout=timeout): + entries.append(entry) + count += 1 + if max_entries and count >= max_entries: + break + except asyncio.TimeoutError: + log.debug(f"Server log collection timed out after {count} entries") + return entries + + async def collect_replica( + self, + step: int, + replica: str, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a replica into a list. + + Args: + step: The step number. + replica: The replica identifier. + max_entries: Maximum number of entries to collect (None for unlimited). + timeout: Maximum seconds to collect. Default 30s. + + Returns: + List of LogEntry objects. + """ + entries: list[LogEntry] = [] + count = 0 + try: + async for entry in self.stream_replica(step, replica, timeout=timeout): + entries.append(entry) + count += 1 + if max_entries and count >= max_entries: + break + except asyncio.TimeoutError: + log.debug(f"Replica log collection timed out after {count} entries") + return entries + + async def _stream_logs( + self, endpoint: str, timeout: Optional[float] = None + ) -> AsyncIterator[LogEntry]: + """Internal method to stream SSE logs from an endpoint. + + Args: + endpoint: The API endpoint to stream from. + timeout: Maximum seconds to stream. None for no timeout. + + Yields: + LogEntry objects parsed from SSE events. + """ + client = self._http_client._get_client() + headers = {"Accept": "text/event-stream"} + + log.debug(f"Opening SSE stream: GET {endpoint} (timeout={timeout})") + + async def _do_stream() -> AsyncIterator[LogEntry]: + async with client.stream( + "GET", + endpoint, + headers=headers, + timeout=httpx.Timeout(5.0, read=None), + ) as response: + if not response.is_success: + await response.aread() + self._handle_error_response(response) + + async for entry in self._parse_sse_stream(response): + yield entry + + if timeout is not None: + async with asyncio.timeout(timeout): + async for entry in _do_stream(): + yield entry + else: + async for entry in _do_stream(): + yield entry + + async def _parse_sse_stream( + self, response: httpx.Response + ) -> AsyncIterator[LogEntry]: + """Parse SSE events from the response stream. + + Args: + response: The streaming httpx Response. + + Yields: + LogEntry objects from 'data' events. + + Raises: + ValidationError: If a 'problem' event indicates validation issues. + APIError: If a 'problem' event indicates other API errors. + """ + event_type: Optional[str] = None + data_buffer: list[str] = [] + + async for line in response.aiter_lines(): + line = line.strip() + + if not line: + # Empty line = end of event + if event_type and data_buffer: + data_str = "\n".join(data_buffer) + + # Handle end/close events - stop streaming + if event_type in ("end", "close", "done", "complete"): + log.debug(f"Received {event_type} event, closing stream") + return + + # Handle problem events first (raises exception) + if event_type == "problem": + await self._process_sse_event(event_type, data_str) + elif event_type == "data": + try: + json_data = json.loads(data_str) + # API can return a single entry or an array of entries + if isinstance(json_data, list): + for item in json_data: + yield LogEntry.model_validate(item) + else: + yield LogEntry.model_validate(json_data) + except json.JSONDecodeError as e: + log.warning(f"Failed to parse log entry JSON: {e}") + except Exception as e: + log.warning(f"Failed to validate log entry: {e}") + elif event_type in ("end", "close", "done", "complete"): + # End event with no data + log.debug(f"Received {event_type} event, closing stream") + return + + event_type = None + data_buffer = [] + continue + + if line.startswith("event:"): + event_type = line[6:].strip() + elif line.startswith("data:"): + data_buffer.append(line[5:].strip()) + elif line.startswith(":"): + # SSE comment - ignore + pass + else: + # Some SSE implementations send data without "data:" prefix + if event_type: + data_buffer.append(line) + + async def _process_sse_event(self, event_type: str, data: str) -> None: + """Process an SSE event and raise exceptions for problem events. + + Args: + event_type: The event type ('data' or 'problem'). + data: The JSON data from the event. + + Raises: + ValidationError: For 400 status problems. + APIError: For other problem statuses. + """ + if event_type == "problem": + try: + problem_data = json.loads(data) + problem = LogProblem.model_validate(problem_data) + + if problem.status == 400: + raise ValidationError( + message=problem.reason, + errors=[{"detail": problem.detail}] if problem.detail else None, + ) + else: + raise APIError( + message=problem.reason, + status_code=problem.status, + response_body=problem_data, + ) + except json.JSONDecodeError: + raise APIError(message=f"Invalid problem event: {data}") + + def _handle_error_response(self, response: httpx.Response) -> None: + """Handle non-success HTTP responses. + + Args: + response: The failed httpx Response. + + Raises: + Appropriate exception based on status code. + """ + from ....exceptions import raise_for_status + + raise_for_status(response) diff --git a/src/codesphere/resources/workspace/logs/schemas.py b/src/codesphere/resources/workspace/logs/schemas.py new file mode 100644 index 0000000..20c581c --- /dev/null +++ b/src/codesphere/resources/workspace/logs/schemas.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from enum import Enum +from typing import Optional + +from ....core.base import CamelModel + + +class LogStage(str, Enum): + """Pipeline stage for log retrieval.""" + + PREPARE = "prepare" + TEST = "test" + RUN = "run" + + +class LogEntry(CamelModel): + """A single log entry from the workspace logs stream.""" + + model_config = {"extra": "allow"} + + timestamp: Optional[str] = None + kind: Optional[str] = None # "I" for info, "E" for error, etc. + data: Optional[str] = None # The actual log content + + def get_text(self) -> str: + """Get the log text.""" + return self.data or "" + + +class LogProblem(CamelModel): + """Problem event from the logs SSE stream.""" + + status: int + reason: str + detail: Optional[str] = None diff --git a/src/codesphere/resources/workspace/schemas.py b/src/codesphere/resources/workspace/schemas.py index 4d67a58..ded34ce 100644 --- a/src/codesphere/resources/workspace/schemas.py +++ b/src/codesphere/resources/workspace/schemas.py @@ -11,6 +11,7 @@ from .envVars import EnvVar, WorkspaceEnvVarManager from .git import WorkspaceGitManager from .landscape import WorkspaceLandscapeManager +from .logs import WorkspaceLogManager log = logging.getLogger(__name__) @@ -144,3 +145,23 @@ def git(self) -> WorkspaceGitManager: """Manager for git operations (head, pull).""" http_client = self.validate_http_client() return WorkspaceGitManager(http_client, workspace_id=self.id) + + @cached_property + def logs(self) -> WorkspaceLogManager: + """Manager for streaming workspace logs. + + Provides methods to stream or collect logs from pipeline stages + (prepare, test, run) and Multi Server Deployment servers/replicas. + + Example: + ```python + # Stream logs as they arrive + async for entry in workspace.logs.stream(stage="prepare", step=1): + print(entry.message) + + # Collect all logs at once + entries = await workspace.logs.collect(stage="test", step=1) + ``` + """ + http_client = self.validate_http_client() + return WorkspaceLogManager(http_client, workspace_id=self.id) diff --git a/tests/resources/workspace/landscape/test_pipeline.py b/tests/resources/workspace/landscape/test_pipeline.py new file mode 100644 index 0000000..e7b5239 --- /dev/null +++ b/tests/resources/workspace/landscape/test_pipeline.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from codesphere.resources.workspace.landscape import ( + PipelineStage, + PipelineState, + PipelineStatus, + PipelineStatusList, + StepStatus, +) +from codesphere.resources.workspace.landscape.models import WorkspaceLandscapeManager + + +class TestPipelineSchemas: + def test_pipeline_stage_enum_values(self): + assert PipelineStage.PREPARE.value == "prepare" + assert PipelineStage.TEST.value == "test" + assert PipelineStage.RUN.value == "run" + + def test_pipeline_state_enum_values(self): + assert PipelineState.WAITING.value == "waiting" + assert PipelineState.RUNNING.value == "running" + assert PipelineState.SUCCESS.value == "success" + assert PipelineState.FAILURE.value == "failure" + assert PipelineState.ABORTED.value == "aborted" + + def test_step_status_create(self): + status = StepStatus(state=PipelineState.RUNNING) + assert status.state == PipelineState.RUNNING + assert status.started_at is None + assert status.finished_at is None + + def test_step_status_with_timestamps(self): + status = StepStatus( + state=PipelineState.SUCCESS, + started_at="2026-02-10T10:00:00Z", + finished_at="2026-02-10T10:05:00Z", + ) + assert status.state == PipelineState.SUCCESS + assert status.started_at == "2026-02-10T10:00:00Z" + assert status.finished_at == "2026-02-10T10:05:00Z" + + def test_pipeline_status_create(self): + status = PipelineStatus( + state=PipelineState.RUNNING, + steps=[StepStatus(state=PipelineState.SUCCESS)], + replica="replica-1", + server="web", + ) + assert status.state == PipelineState.RUNNING + assert len(status.steps) == 1 + assert status.replica == "replica-1" + assert status.server == "web" + + def test_pipeline_status_list(self): + statuses = [ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ), + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-2", + server="web", + ), + ] + status_list = PipelineStatusList(root=statuses) + + assert len(status_list) == 2 + assert status_list[0].replica == "replica-1" + assert status_list[1].state == PipelineState.RUNNING + + +class TestWorkspaceLandscapeManagerPipeline: + @pytest.fixture + def mock_http_client(self): + client = MagicMock() + client._get_client = MagicMock() + return client + + @pytest.fixture + def landscape_manager(self, mock_http_client): + return WorkspaceLandscapeManager(mock_http_client, workspace_id=123) + + @pytest.mark.asyncio + async def test_start_stage_with_enum(self, landscape_manager): + """Test start_stage with PipelineStage enum.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage(PipelineStage.PREPARE) + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "prepare" + + @pytest.mark.asyncio + async def test_start_stage_with_string(self, landscape_manager): + """Test start_stage with string stage.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage("run") + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + + @pytest.mark.asyncio + async def test_start_stage_with_profile(self, landscape_manager): + """Test start_stage with a profile name.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage(PipelineStage.RUN, profile="production") + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + assert call_args.kwargs["profile"] == "production" + + @pytest.mark.asyncio + async def test_start_stage_invalid_profile_name(self, landscape_manager): + """Test start_stage with invalid profile name raises ValueError.""" + with pytest.raises(ValueError, match="Invalid profile name"): + await landscape_manager.start_stage( + PipelineStage.RUN, profile="invalid profile!" + ) + + @pytest.mark.asyncio + async def test_stop_stage(self, landscape_manager): + """Test stop_stage.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.stop_stage(PipelineStage.RUN) + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + + @pytest.mark.asyncio + async def test_get_stage_status(self, landscape_manager): + """Test get_stage_status returns PipelineStatusList.""" + mock_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=mock_status) + + result = await landscape_manager.get_stage_status(PipelineStage.RUN) + + assert len(result) == 1 + assert result[0].state == PipelineState.RUNNING + + @pytest.mark.asyncio + async def test_wait_for_stage_completes_immediately(self, landscape_manager): + """Test wait_for_stage when stage is already complete.""" + mock_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=mock_status) + + result = await landscape_manager.wait_for_stage(PipelineStage.PREPARE) + + assert len(result) == 1 + assert result[0].state == PipelineState.SUCCESS + + @pytest.mark.asyncio + async def test_wait_for_stage_polls_until_complete(self, landscape_manager): + """Test wait_for_stage polls until stage completes.""" + running_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + success_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + + landscape_manager._execute_operation = AsyncMock( + side_effect=[running_status, running_status, success_status] + ) + + result = await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, poll_interval=0.01 + ) + + assert result[0].state == PipelineState.SUCCESS + assert landscape_manager._execute_operation.call_count == 3 + + @pytest.mark.asyncio + async def test_wait_for_stage_timeout(self, landscape_manager): + """Test wait_for_stage raises TimeoutError.""" + running_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=running_status) + + with pytest.raises(TimeoutError, match="did not complete"): + await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, timeout=0.05, poll_interval=0.01 + ) + + @pytest.mark.asyncio + async def test_wait_for_stage_invalid_poll_interval(self, landscape_manager): + """Test wait_for_stage with invalid poll_interval raises ValueError.""" + with pytest.raises(ValueError, match="poll_interval must be greater than 0"): + await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, poll_interval=0 + ) diff --git a/tests/resources/workspace/logs/__init__.py b/tests/resources/workspace/logs/__init__.py new file mode 100644 index 0000000..5e95095 --- /dev/null +++ b/tests/resources/workspace/logs/__init__.py @@ -0,0 +1 @@ +# Tests for workspace logs module diff --git a/tests/resources/workspace/logs/test_logs.py b/tests/resources/workspace/logs/test_logs.py new file mode 100644 index 0000000..1d70588 --- /dev/null +++ b/tests/resources/workspace/logs/test_logs.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from codesphere.exceptions import APIError, ValidationError +from codesphere.resources.workspace.logs import ( + LogEntry, + LogProblem, + LogStage, + WorkspaceLogManager, +) + + +class TestLogStage: + def test_enum_values(self): + assert LogStage.PREPARE.value == "prepare" + assert LogStage.TEST.value == "test" + assert LogStage.RUN.value == "run" + + +class TestLogEntry: + def test_create_with_data_only(self): + entry = LogEntry(data="Test log message") + assert entry.data == "Test log message" + assert entry.timestamp is None + assert entry.kind is None + + def test_create_with_all_fields(self): + entry = LogEntry( + data="Test message", + timestamp="2026-02-10T12:00:00Z", + kind="I", + ) + assert entry.data == "Test message" + assert entry.timestamp == "2026-02-10T12:00:00Z" + assert entry.kind == "I" + + def test_from_dict(self): + data = {"data": "Hello", "timestamp": "2026-02-10T12:00:00Z"} + entry = LogEntry.model_validate(data) + assert entry.data == "Hello" + + def test_get_text(self): + entry = LogEntry(data="Log content") + assert entry.get_text() == "Log content" + + def test_get_text_empty(self): + entry = LogEntry() + assert entry.get_text() == "" + + +class TestLogProblem: + def test_create_problem(self): + problem = LogProblem(status=400, reason="Workspace is not running") + assert problem.status == 400 + assert problem.reason == "Workspace is not running" + assert problem.detail is None + + def test_create_problem_with_detail(self): + problem = LogProblem( + status=404, + reason="Not found", + detail="Workspace 123 does not exist", + ) + assert problem.status == 404 + assert problem.detail == "Workspace 123 does not exist" + + +async def mock_stream_logs_factory(entries: list[LogEntry], capture_endpoint: list): + """Factory to create a mock _stream_logs that captures the endpoint.""" + + async def mock_stream_logs(endpoint: str): + capture_endpoint.append(endpoint) + for entry in entries: + yield entry + + return mock_stream_logs + + +class TestWorkspaceLogManager: + @pytest.fixture + def mock_http_client(self): + client = MagicMock() + client._get_client = MagicMock() + return client + + @pytest.fixture + def log_manager(self, mock_http_client): + return WorkspaceLogManager(mock_http_client, workspace_id=123) + + def test_init(self, log_manager, mock_http_client): + assert log_manager._http_client == mock_http_client + assert log_manager._workspace_id == 123 + + @pytest.mark.asyncio + async def test_stream_builds_correct_endpoint(self, log_manager): + """Test that stream() builds the correct endpoint URL.""" + captured_endpoints: list[str] = [] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + captured_endpoints.append(endpoint) + return + yield # Make it a generator + + log_manager._stream_logs = mock_stream_logs + + async for _ in log_manager.stream(stage=LogStage.PREPARE, step=1): + pass + + assert captured_endpoints == ["/workspaces/123/logs/prepare/1"] + + @pytest.mark.asyncio + async def test_stream_with_string_stage(self, log_manager): + """Test that stream() accepts string stage values.""" + captured_endpoints: list[str] = [] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + captured_endpoints.append(endpoint) + return + yield + + log_manager._stream_logs = mock_stream_logs + + async for _ in log_manager.stream(stage="test", step=2): + pass + + assert captured_endpoints == ["/workspaces/123/logs/test/2"] + + @pytest.mark.asyncio + async def test_stream_server_builds_correct_endpoint(self, log_manager): + """Test that stream_server() builds the correct endpoint URL.""" + captured_endpoints: list[str] = [] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + captured_endpoints.append(endpoint) + return + yield + + log_manager._stream_logs = mock_stream_logs + + async for _ in log_manager.stream_server(step=1, server="web"): + pass + + assert captured_endpoints == ["/workspaces/123/logs/run/1/server/web"] + + @pytest.mark.asyncio + async def test_stream_replica_builds_correct_endpoint(self, log_manager): + """Test that stream_replica() builds the correct endpoint URL.""" + captured_endpoints: list[str] = [] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + captured_endpoints.append(endpoint) + return + yield + + log_manager._stream_logs = mock_stream_logs + + async for _ in log_manager.stream_replica(step=2, replica="replica-1"): + pass + + assert captured_endpoints == ["/workspaces/123/logs/run/2/replica/replica-1"] + + @pytest.mark.asyncio + async def test_collect_returns_list(self, log_manager): + """Test that collect() returns a list of log entries.""" + entries = [ + LogEntry(data="Log 1"), + LogEntry(data="Log 2"), + LogEntry(data="Log 3"), + ] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + for entry in entries: + yield entry + + log_manager._stream_logs = mock_stream_logs + + result = await log_manager.collect(stage=LogStage.PREPARE, step=1) + + assert len(result) == 3 + assert result[0].data == "Log 1" + assert result[2].data == "Log 3" + + @pytest.mark.asyncio + async def test_collect_with_max_entries(self, log_manager): + """Test that collect() respects max_entries limit.""" + entries = [ + LogEntry(data="Log 1"), + LogEntry(data="Log 2"), + LogEntry(data="Log 3"), + ] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + for entry in entries: + yield entry + + log_manager._stream_logs = mock_stream_logs + + result = await log_manager.collect( + stage=LogStage.PREPARE, step=1, max_entries=2 + ) + + assert len(result) == 2 + + @pytest.mark.asyncio + async def test_collect_server_returns_list(self, log_manager): + """Test that collect_server() returns a list of log entries.""" + entries = [LogEntry(data="Server log 1")] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + for entry in entries: + yield entry + + log_manager._stream_logs = mock_stream_logs + + result = await log_manager.collect_server(step=1, server="web") + + assert len(result) == 1 + assert result[0].data == "Server log 1" + + @pytest.mark.asyncio + async def test_collect_replica_returns_list(self, log_manager): + """Test that collect_replica() returns a list of log entries.""" + entries = [LogEntry(data="Replica log 1")] + + async def mock_stream_logs(endpoint: str, timeout: float = None): + for entry in entries: + yield entry + + log_manager._stream_logs = mock_stream_logs + + result = await log_manager.collect_replica(step=1, replica="replica-1") + + assert len(result) == 1 + assert result[0].data == "Replica log 1" + + @pytest.mark.asyncio + async def test_process_sse_event_raises_on_problem(self, log_manager): + """Test that problem events raise appropriate exceptions.""" + problem_data = '{"status": 400, "reason": "Workspace is not running"}' + + with pytest.raises(ValidationError) as exc_info: + await log_manager._process_sse_event("problem", problem_data) + + assert "Workspace is not running" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_process_sse_event_raises_api_error_for_non_400(self, log_manager): + """Test that non-400 problem events raise APIError.""" + problem_data = '{"status": 404, "reason": "Workspace not found"}' + + with pytest.raises(APIError) as exc_info: + await log_manager._process_sse_event("problem", problem_data) + + assert exc_info.value.status_code == 404 + + @pytest.mark.asyncio + async def test_process_sse_event_data_does_not_raise(self, log_manager): + """Test that data events do not raise exceptions.""" + data = '{"data": "Test log"}' + # Should not raise + await log_manager._process_sse_event("data", data) + + @pytest.mark.asyncio + async def test_process_sse_event_invalid_json_raises(self, log_manager): + """Test that invalid JSON in problem events raises APIError.""" + with pytest.raises(APIError) as exc_info: + await log_manager._process_sse_event("problem", "invalid json") + + assert "Invalid problem event" in str(exc_info.value) From 87e53f6a474cf3b3b2258f50e954e00f995642b4 Mon Sep 17 00:00:00 2001 From: Datata1 <> Date: Tue, 10 Feb 2026 13:56:55 +0100 Subject: [PATCH 2/2] context manager for log streams --- examples/create_workspace_with_landscape.py | 24 +- src/codesphere/core/__init__.py | 8 +- src/codesphere/core/operations.py | 14 +- .../resources/workspace/__init__.py | 3 +- .../resources/workspace/landscape/models.py | 61 --- .../workspace/landscape/operations.py | 1 - .../resources/workspace/landscape/schemas.py | 10 - .../resources/workspace/logs/__init__.py | 3 +- .../resources/workspace/logs/models.py | 496 ++++++++---------- .../resources/workspace/logs/operations.py | 17 + .../resources/workspace/logs/schemas.py | 9 +- tests/core/test_operations.py | 48 +- tests/resources/workspace/logs/__init__.py | 1 - tests/resources/workspace/logs/test_logs.py | 297 +++++------ 14 files changed, 427 insertions(+), 565 deletions(-) create mode 100644 src/codesphere/resources/workspace/logs/operations.py diff --git a/examples/create_workspace_with_landscape.py b/examples/create_workspace_with_landscape.py index fd8553c..ff020fb 100644 --- a/examples/create_workspace_with_landscape.py +++ b/examples/create_workspace_with_landscape.py @@ -1,7 +1,3 @@ -""" -Demo: Create a workspace, deploy a landscape profile, and stream logs. -""" - import asyncio import time @@ -14,7 +10,7 @@ ) from codesphere.resources.workspace.logs import LogStage -TEAM_ID = 35698 +TEAM_ID = 123 async def main(): @@ -25,8 +21,8 @@ async def main(): raise ValueError("Micro plan not found") workspace_name = f"pipeline-demo-{int(time.time())}" - print(f"Creating workspace '{workspace_name}'...") + print(f"Creating workspace '{workspace_name}'...") workspace = await sdk.workspaces.create( WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name) ) @@ -45,7 +41,7 @@ async def main(): .add_reactive_service("web") .plan(plan.id) .add_step( - 'for i in $(seq 1 50); do echo "[$i] Processing request..."; sleep 1; done' + 'for i in $(seq 1 20); do echo "[$i] Processing request..."; sleep 1; done' ) .add_port(3000, public=True) .add_path("/", port=3000) @@ -82,17 +78,17 @@ async def main(): print("\n--- Run Stage ---") await workspace.landscape.start_stage(PipelineStage.RUN, profile="production") - print("Started run stage, waiting for logs...\n") + print("Started run stage\n") - print("Streaming logs from 'web' service:") + print("Streaming logs from 'web' service (using context manager):") count = 0 - async for entry in workspace.logs.stream_server(step=0, server="web"): - if entry.get_text(): - print(f" {entry.get_text().strip()}") - count += 1 + async with workspace.logs.open_server_stream(step=0, server="web") as stream: + async for entry in stream: + if entry.get_text(): + print(f" {entry.get_text().strip()}") + count += 1 print(f"\n✓ Stream ended ({count} log entries)") - print(f"✓ Workspace {workspace.id} is still running.") if __name__ == "__main__": diff --git a/src/codesphere/core/__init__.py b/src/codesphere/core/__init__.py index 1df8f8f..f92ed07 100644 --- a/src/codesphere/core/__init__.py +++ b/src/codesphere/core/__init__.py @@ -1,11 +1,13 @@ -from .base import ResourceBase -from .operations import APIOperation, AsyncCallable -from .handler import _APIOperationExecutor, APIRequestHandler +from .base import CamelModel, ResourceBase +from .handler import APIRequestHandler, _APIOperationExecutor +from .operations import APIOperation, AsyncCallable, StreamOperation __all__ = [ + "CamelModel", "ResourceBase", "APIOperation", "_APIOperationExecutor", "APIRequestHandler", "AsyncCallable", + "StreamOperation", ] diff --git a/src/codesphere/core/operations.py b/src/codesphere/core/operations.py index bf8ee3f..a6192f7 100644 --- a/src/codesphere/core/operations.py +++ b/src/codesphere/core/operations.py @@ -1,17 +1,25 @@ -from typing import Callable, Awaitable, Generic, Optional, Type, TypeAlias, TypeVar - -from pydantic import BaseModel +from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar +from pydantic import BaseModel, ConfigDict _T = TypeVar("_T") ResponseT = TypeVar("ResponseT") InputT = TypeVar("InputT") +EntryT = TypeVar("EntryT") AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]] class APIOperation(BaseModel, Generic[ResponseT, InputT]): + model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True) + method: str endpoint_template: str response_model: Type[ResponseT] input_model: Optional[Type[InputT]] = None + + +class StreamOperation(BaseModel, Generic[EntryT]): + model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True) + endpoint_template: str + entry_model: Type[EntryT] diff --git a/src/codesphere/resources/workspace/__init__.py b/src/codesphere/resources/workspace/__init__.py index 9c74ae4..4045de8 100644 --- a/src/codesphere/resources/workspace/__init__.py +++ b/src/codesphere/resources/workspace/__init__.py @@ -1,5 +1,5 @@ from .git import GitHead, WorkspaceGitManager -from .logs import LogEntry, LogProblem, LogStage, WorkspaceLogManager +from .logs import LogEntry, LogProblem, LogStage, LogStream, WorkspaceLogManager from .resources import WorkspacesResource from .schemas import ( CommandInput, @@ -20,6 +20,7 @@ "CommandOutput", "WorkspaceGitManager", "GitHead", + "LogStream", "WorkspaceLogManager", "LogEntry", "LogProblem", diff --git a/src/codesphere/resources/workspace/landscape/models.py b/src/codesphere/resources/workspace/landscape/models.py index 6458964..99e9bdc 100644 --- a/src/codesphere/resources/workspace/landscape/models.py +++ b/src/codesphere/resources/workspace/landscape/models.py @@ -107,24 +107,11 @@ async def teardown(self) -> None: async def scale(self, services: Dict[str, int]) -> None: await self._execute_operation(_SCALE_OP, data=services) - # Pipeline operations - async def start_stage( self, stage: Union[PipelineStage, str], profile: Optional[str] = None, ) -> None: - """Start a pipeline stage. - - Args: - stage: The pipeline stage to start ('prepare', 'test', or 'run'). - profile: Optional profile name. If provided, starts the stage with - that profile. Required for first run after deploy. - - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - NotFoundError: If the workspace is not found. - """ if isinstance(stage, PipelineStage): stage = stage.value @@ -137,15 +124,6 @@ async def start_stage( await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage) async def stop_stage(self, stage: Union[PipelineStage, str]) -> None: - """Stop a pipeline stage. - - Args: - stage: The pipeline stage to stop ('prepare', 'test', or 'run'). - - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - NotFoundError: If the workspace is not found. - """ if isinstance(stage, PipelineStage): stage = stage.value @@ -154,18 +132,6 @@ async def stop_stage(self, stage: Union[PipelineStage, str]) -> None: async def get_stage_status( self, stage: Union[PipelineStage, str] ) -> PipelineStatusList: - """Get the status of a pipeline stage. - - Args: - stage: The pipeline stage to get status for ('prepare', 'test', or 'run'). - - Returns: - List of PipelineStatus objects, one per replica/server. - - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - NotFoundError: If the workspace is not found. - """ if isinstance(stage, PipelineStage): stage = stage.value @@ -179,22 +145,6 @@ async def wait_for_stage( poll_interval: float = 5.0, server: Optional[str] = None, ) -> PipelineStatusList: - """Wait for a pipeline stage to complete (success or failure). - - Args: - stage: The pipeline stage to wait for. - timeout: Maximum time to wait in seconds (default: 300). - poll_interval: Time between status checks in seconds (default: 5). - server: Optional server name to filter by. If None, waits for all - servers that have steps defined for this stage. - - Returns: - Final PipelineStatusList when stage completes. - - Raises: - TimeoutError: If the stage doesn't complete within the timeout. - ValidationError: If the workspace is not running. - """ if poll_interval <= 0: raise ValueError("poll_interval must be greater than 0") @@ -204,26 +154,17 @@ async def wait_for_stage( while elapsed < timeout: status_list = await self.get_stage_status(stage) - # Filter to relevant servers for THIS stage - # A server is relevant for this stage if: - # - It has steps defined (meaning it participates in this stage) - # - OR it's not in 'waiting' state (meaning it has started) relevant_statuses = [] for s in status_list: if server is not None: - # Filter by specific server if s.server == server: relevant_statuses.append(s) else: - # Include servers that have steps for this stage - # Servers with no steps and waiting state don't participate in this stage if s.steps: relevant_statuses.append(s) elif s.state != PipelineState.WAITING: - # Started but no steps visible yet relevant_statuses.append(s) - # If no relevant statuses yet, keep waiting if not relevant_statuses: log.debug( "Pipeline stage '%s': no servers with steps yet, waiting...", @@ -233,7 +174,6 @@ async def wait_for_stage( elapsed += poll_interval continue - # Check if all relevant servers have completed all_completed = all( s.state in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED) @@ -244,7 +184,6 @@ async def wait_for_stage( log.debug("Pipeline stage '%s' completed.", stage_name) return PipelineStatusList(root=relevant_statuses) - # Log current state states = [f"{s.server}={s.state.value}" for s in relevant_statuses] log.debug( "Pipeline stage '%s' status: %s (elapsed: %.1fs)", diff --git a/src/codesphere/resources/workspace/landscape/operations.py b/src/codesphere/resources/workspace/landscape/operations.py index be2e825..9d2a53a 100644 --- a/src/codesphere/resources/workspace/landscape/operations.py +++ b/src/codesphere/resources/workspace/landscape/operations.py @@ -25,7 +25,6 @@ response_model=type(None), ) -# Pipeline operations _START_PIPELINE_STAGE_OP = APIOperation( method="POST", endpoint_template="/workspaces/{id}/pipeline/{stage}/start", diff --git a/src/codesphere/resources/workspace/landscape/schemas.py b/src/codesphere/resources/workspace/landscape/schemas.py index ec70f95..9baa40d 100644 --- a/src/codesphere/resources/workspace/landscape/schemas.py +++ b/src/codesphere/resources/workspace/landscape/schemas.py @@ -10,16 +10,12 @@ class PipelineStage(str, Enum): - """Pipeline stage for operations.""" - PREPARE = "prepare" TEST = "test" RUN = "run" class PipelineState(str, Enum): - """State of a pipeline stage or step.""" - WAITING = "waiting" RUNNING = "running" SUCCESS = "success" @@ -28,16 +24,12 @@ class PipelineState(str, Enum): class StepStatus(CamelModel): - """Status of a single pipeline step.""" - state: PipelineState started_at: Optional[str] = None finished_at: Optional[str] = None class PipelineStatus(CamelModel): - """Status of a pipeline stage execution.""" - state: PipelineState started_at: Optional[str] = None finished_at: Optional[str] = None @@ -47,8 +39,6 @@ class PipelineStatus(CamelModel): class PipelineStatusList(RootModel[List[PipelineStatus]]): - """List of pipeline status entries (one per replica/server).""" - root: List[PipelineStatus] def __iter__(self): diff --git a/src/codesphere/resources/workspace/logs/__init__.py b/src/codesphere/resources/workspace/logs/__init__.py index 8400cb8..27ce1d5 100644 --- a/src/codesphere/resources/workspace/logs/__init__.py +++ b/src/codesphere/resources/workspace/logs/__init__.py @@ -1,7 +1,8 @@ -from .models import WorkspaceLogManager +from .models import LogStream, WorkspaceLogManager from .schemas import LogEntry, LogProblem, LogStage __all__ = [ + "LogStream", "WorkspaceLogManager", "LogEntry", "LogProblem", diff --git a/src/codesphere/resources/workspace/logs/models.py b/src/codesphere/resources/workspace/logs/models.py index 5d09e71..8f703ac 100644 --- a/src/codesphere/resources/workspace/logs/models.py +++ b/src/codesphere/resources/workspace/logs/models.py @@ -3,98 +3,226 @@ import asyncio import json import logging -from typing import AsyncIterator, Optional, Union +from typing import AsyncIterator, Optional, Type, Union import httpx -from ....exceptions import APIError, ValidationError +from ....core.operations import StreamOperation +from ....exceptions import APIError, ValidationError, raise_for_status from ....http_client import APIHttpClient +from .operations import ( + _STREAM_REPLICA_LOGS_OP, + _STREAM_SERVER_LOGS_OP, + _STREAM_STAGE_LOGS_OP, +) from .schemas import LogEntry, LogProblem, LogStage log = logging.getLogger(__name__) -class WorkspaceLogManager: - """Manager for streaming workspace logs via SSE. +class LogStream: + """Async context manager for streaming logs via SSE.""" + + def __init__( + self, + client: httpx.AsyncClient, + endpoint: str, + entry_model: Type[LogEntry], + timeout: Optional[float] = None, + ): + self._client = client + self._endpoint = endpoint + self._entry_model = entry_model + self._timeout = timeout + self._response: Optional[httpx.Response] = None + self._stream_context = None + + async def __aenter__(self) -> LogStream: + headers = {"Accept": "text/event-stream"} + self._stream_context = self._client.stream( + "GET", + self._endpoint, + headers=headers, + timeout=httpx.Timeout(5.0, read=None), + ) + self._response = await self._stream_context.__aenter__() + + if not self._response.is_success: + await self._response.aread() + raise_for_status(self._response) + + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + if self._stream_context: + await self._stream_context.__aexit__(exc_type, exc_val, exc_tb) + + def __aiter__(self) -> AsyncIterator[LogEntry]: + return self._iterate() + + async def _iterate(self) -> AsyncIterator[LogEntry]: + if self._response is None: + raise RuntimeError("LogStream must be used as async context manager") + + if self._timeout is not None: + async with asyncio.timeout(self._timeout): + async for entry in self._parse_sse_stream(): + yield entry + else: + async for entry in self._parse_sse_stream(): + yield entry + + async def _parse_sse_stream(self) -> AsyncIterator[LogEntry]: + event_type: Optional[str] = None + data_buffer: list[str] = [] + + async for line in self._response.aiter_lines(): + line = line.strip() + + if not line: + if event_type and data_buffer: + data_str = "\n".join(data_buffer) + + if event_type in ("end", "close", "done", "complete"): + return + + if event_type == "problem": + self._handle_problem(data_str) + elif event_type == "data": + for entry in self._parse_data(data_str): + yield entry + + elif event_type in ("end", "close", "done", "complete"): + return + + event_type = None + data_buffer = [] + continue + + if line.startswith("event:"): + event_type = line[6:].strip() + elif line.startswith("data:"): + data_buffer.append(line[5:].strip()) + elif not line.startswith(":"): + if event_type: + data_buffer.append(line) - Provides async iterators for streaming logs from different pipeline stages - and Multi Server Deployment servers/replicas. + def _parse_data(self, data_str: str) -> list[LogEntry]: + entries = [] + try: + json_data = json.loads(data_str) + if isinstance(json_data, list): + for item in json_data: + entries.append(self._entry_model.model_validate(item)) + else: + entries.append(self._entry_model.model_validate(json_data)) + except json.JSONDecodeError as e: + log.warning(f"Failed to parse log entry JSON: {e}") + except Exception as e: + log.warning(f"Failed to validate log entry: {e}") + return entries - Example: - ```python - # Stream prepare stage logs (with timeout for completed stages) - async for entry in workspace.logs.stream(stage=LogStage.PREPARE, step=1, timeout=30.0): - print(entry.message) + def _handle_problem(self, data_str: str) -> None: + try: + problem_data = json.loads(data_str) + problem = LogProblem.model_validate(problem_data) + + if problem.status == 400: + raise ValidationError( + message=problem.reason, + errors=[{"detail": problem.detail}] if problem.detail else None, + ) + else: + raise APIError( + message=problem.reason, + status_code=problem.status, + response_body=problem_data, + ) + except json.JSONDecodeError: + raise APIError(message=f"Invalid problem event: {data_str}") - # Stream run logs for a specific server (no timeout for live streams) - async for entry in workspace.logs.stream_server(step=1, server="web"): - print(entry.message) - # Collect all logs at once - logs = await workspace.logs.collect(stage=LogStage.TEST, step=1) - for entry in logs: - print(entry.message) - ``` - """ +class WorkspaceLogManager: + """Manager for streaming workspace logs via SSE.""" def __init__(self, http_client: APIHttpClient, workspace_id: int): self._http_client = http_client self._workspace_id = workspace_id + self.id = workspace_id - async def stream( + def _build_endpoint(self, operation: StreamOperation, **kwargs) -> str: + return operation.endpoint_template.format(id=self._workspace_id, **kwargs) + + def _open_stream( + self, + operation: StreamOperation, + timeout: Optional[float] = None, + **kwargs, + ) -> LogStream: + endpoint = self._build_endpoint(operation, **kwargs) + return LogStream( + client=self._http_client._get_client(), + endpoint=endpoint, + entry_model=operation.entry_model, + timeout=timeout, + ) + + def open_stream( self, stage: Union[LogStage, str], step: int, - timeout: Optional[float] = 30.0, - ) -> AsyncIterator[LogEntry]: - """Stream logs for a given stage and step. - - Args: - stage: The pipeline stage ('prepare', 'test', or 'run'). - For 'run' stage of Multi Server Deployments, use - stream_server() or stream_replica() instead. - step: The step number within the stage. - timeout: Maximum seconds to wait for the stream. Default 30s. - Use None for no timeout (for live/running stages). - - Yields: - LogEntry objects as they arrive from the SSE stream. - - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - APIError: For other API errors. - asyncio.TimeoutError: If timeout is reached. - """ + timeout: Optional[float] = None, + ) -> LogStream: + """Open a log stream as an async context manager.""" if isinstance(stage, LogStage): stage = stage.value + return self._open_stream( + _STREAM_STAGE_LOGS_OP, timeout=timeout, stage=stage, step=step + ) - endpoint = f"/workspaces/{self._workspace_id}/logs/{stage}/{step}" - async for entry in self._stream_logs(endpoint, timeout=timeout): - yield entry - - async def stream_server( + def open_server_stream( self, step: int, server: str, timeout: Optional[float] = None, - ) -> AsyncIterator[LogEntry]: - """Stream run logs for a specific server in a Multi Server Deployment. + ) -> LogStream: + """Open a server log stream as an async context manager.""" + return self._open_stream( + _STREAM_SERVER_LOGS_OP, timeout=timeout, step=step, server=server + ) - Args: - step: The step number. - server: The server name. - timeout: Maximum seconds to wait. Default None (no timeout). + def open_replica_stream( + self, + step: int, + replica: str, + timeout: Optional[float] = None, + ) -> LogStream: + """Open a replica log stream as an async context manager.""" + return self._open_stream( + _STREAM_REPLICA_LOGS_OP, timeout=timeout, step=step, replica=replica + ) - Yields: - LogEntry objects as they arrive from the SSE stream. + async def stream( + self, + stage: Union[LogStage, str], + step: int, + timeout: Optional[float] = 30.0, + ) -> AsyncIterator[LogEntry]: + """Stream logs for a given stage and step.""" + async with self.open_stream(stage, step, timeout) as stream: + async for entry in stream: + yield entry - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - APIError: For other API errors. - """ - endpoint = f"/workspaces/{self._workspace_id}/logs/run/{step}/server/{server}" - async for entry in self._stream_logs(endpoint, timeout=timeout): - yield entry + async def stream_server( + self, + step: int, + server: str, + timeout: Optional[float] = None, + ) -> AsyncIterator[LogEntry]: + """Stream run logs for a specific server.""" + async with self.open_server_stream(step, server, timeout) as stream: + async for entry in stream: + yield entry async def stream_replica( self, @@ -102,23 +230,10 @@ async def stream_replica( replica: str, timeout: Optional[float] = None, ) -> AsyncIterator[LogEntry]: - """Stream run logs for a specific replica in a Multi Server Deployment. - - Args: - step: The step number. - replica: The replica identifier. - timeout: Maximum seconds to wait. Default None (no timeout). - - Yields: - LogEntry objects as they arrive from the SSE stream. - - Raises: - ValidationError: If the workspace is not running or parameters are invalid. - APIError: For other API errors. - """ - endpoint = f"/workspaces/{self._workspace_id}/logs/run/{step}/replica/{replica}" - async for entry in self._stream_logs(endpoint, timeout=timeout): - yield entry + """Stream run logs for a specific replica.""" + async with self.open_replica_stream(step, replica, timeout) as stream: + async for entry in stream: + yield entry async def collect( self, @@ -127,27 +242,16 @@ async def collect( max_entries: Optional[int] = None, timeout: Optional[float] = 30.0, ) -> list[LogEntry]: - """Collect all logs for a stage and step into a list. - - Args: - stage: The pipeline stage ('prepare', 'test', or 'run'). - step: The step number within the stage. - max_entries: Maximum number of entries to collect (None for unlimited). - timeout: Maximum seconds to collect. Default 30s. - - Returns: - List of LogEntry objects. - """ + """Collect all logs for a stage and step into a list.""" entries: list[LogEntry] = [] - count = 0 try: - async for entry in self.stream(stage, step, timeout=timeout): - entries.append(entry) - count += 1 - if max_entries and count >= max_entries: - break + async with self.open_stream(stage, step, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break except asyncio.TimeoutError: - log.debug(f"Log collection timed out after {count} entries") + pass return entries async def collect_server( @@ -157,27 +261,16 @@ async def collect_server( max_entries: Optional[int] = None, timeout: Optional[float] = 30.0, ) -> list[LogEntry]: - """Collect all logs for a server into a list. - - Args: - step: The step number. - server: The server name. - max_entries: Maximum number of entries to collect (None for unlimited). - timeout: Maximum seconds to collect. Default 30s. - - Returns: - List of LogEntry objects. - """ + """Collect all logs for a server into a list.""" entries: list[LogEntry] = [] - count = 0 try: - async for entry in self.stream_server(step, server, timeout=timeout): - entries.append(entry) - count += 1 - if max_entries and count >= max_entries: - break + async with self.open_server_stream(step, server, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break except asyncio.TimeoutError: - log.debug(f"Server log collection timed out after {count} entries") + pass return entries async def collect_replica( @@ -187,175 +280,14 @@ async def collect_replica( max_entries: Optional[int] = None, timeout: Optional[float] = 30.0, ) -> list[LogEntry]: - """Collect all logs for a replica into a list. - - Args: - step: The step number. - replica: The replica identifier. - max_entries: Maximum number of entries to collect (None for unlimited). - timeout: Maximum seconds to collect. Default 30s. - - Returns: - List of LogEntry objects. - """ + """Collect all logs for a replica into a list.""" entries: list[LogEntry] = [] - count = 0 try: - async for entry in self.stream_replica(step, replica, timeout=timeout): - entries.append(entry) - count += 1 - if max_entries and count >= max_entries: - break + async with self.open_replica_stream(step, replica, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break except asyncio.TimeoutError: - log.debug(f"Replica log collection timed out after {count} entries") + pass return entries - - async def _stream_logs( - self, endpoint: str, timeout: Optional[float] = None - ) -> AsyncIterator[LogEntry]: - """Internal method to stream SSE logs from an endpoint. - - Args: - endpoint: The API endpoint to stream from. - timeout: Maximum seconds to stream. None for no timeout. - - Yields: - LogEntry objects parsed from SSE events. - """ - client = self._http_client._get_client() - headers = {"Accept": "text/event-stream"} - - log.debug(f"Opening SSE stream: GET {endpoint} (timeout={timeout})") - - async def _do_stream() -> AsyncIterator[LogEntry]: - async with client.stream( - "GET", - endpoint, - headers=headers, - timeout=httpx.Timeout(5.0, read=None), - ) as response: - if not response.is_success: - await response.aread() - self._handle_error_response(response) - - async for entry in self._parse_sse_stream(response): - yield entry - - if timeout is not None: - async with asyncio.timeout(timeout): - async for entry in _do_stream(): - yield entry - else: - async for entry in _do_stream(): - yield entry - - async def _parse_sse_stream( - self, response: httpx.Response - ) -> AsyncIterator[LogEntry]: - """Parse SSE events from the response stream. - - Args: - response: The streaming httpx Response. - - Yields: - LogEntry objects from 'data' events. - - Raises: - ValidationError: If a 'problem' event indicates validation issues. - APIError: If a 'problem' event indicates other API errors. - """ - event_type: Optional[str] = None - data_buffer: list[str] = [] - - async for line in response.aiter_lines(): - line = line.strip() - - if not line: - # Empty line = end of event - if event_type and data_buffer: - data_str = "\n".join(data_buffer) - - # Handle end/close events - stop streaming - if event_type in ("end", "close", "done", "complete"): - log.debug(f"Received {event_type} event, closing stream") - return - - # Handle problem events first (raises exception) - if event_type == "problem": - await self._process_sse_event(event_type, data_str) - elif event_type == "data": - try: - json_data = json.loads(data_str) - # API can return a single entry or an array of entries - if isinstance(json_data, list): - for item in json_data: - yield LogEntry.model_validate(item) - else: - yield LogEntry.model_validate(json_data) - except json.JSONDecodeError as e: - log.warning(f"Failed to parse log entry JSON: {e}") - except Exception as e: - log.warning(f"Failed to validate log entry: {e}") - elif event_type in ("end", "close", "done", "complete"): - # End event with no data - log.debug(f"Received {event_type} event, closing stream") - return - - event_type = None - data_buffer = [] - continue - - if line.startswith("event:"): - event_type = line[6:].strip() - elif line.startswith("data:"): - data_buffer.append(line[5:].strip()) - elif line.startswith(":"): - # SSE comment - ignore - pass - else: - # Some SSE implementations send data without "data:" prefix - if event_type: - data_buffer.append(line) - - async def _process_sse_event(self, event_type: str, data: str) -> None: - """Process an SSE event and raise exceptions for problem events. - - Args: - event_type: The event type ('data' or 'problem'). - data: The JSON data from the event. - - Raises: - ValidationError: For 400 status problems. - APIError: For other problem statuses. - """ - if event_type == "problem": - try: - problem_data = json.loads(data) - problem = LogProblem.model_validate(problem_data) - - if problem.status == 400: - raise ValidationError( - message=problem.reason, - errors=[{"detail": problem.detail}] if problem.detail else None, - ) - else: - raise APIError( - message=problem.reason, - status_code=problem.status, - response_body=problem_data, - ) - except json.JSONDecodeError: - raise APIError(message=f"Invalid problem event: {data}") - - def _handle_error_response(self, response: httpx.Response) -> None: - """Handle non-success HTTP responses. - - Args: - response: The failed httpx Response. - - Raises: - Appropriate exception based on status code. - """ - from ....exceptions import raise_for_status - - raise_for_status(response) diff --git a/src/codesphere/resources/workspace/logs/operations.py b/src/codesphere/resources/workspace/logs/operations.py new file mode 100644 index 0000000..12d0e74 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/operations.py @@ -0,0 +1,17 @@ +from ....core.operations import StreamOperation +from .schemas import LogEntry + +_STREAM_STAGE_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/{stage}/{step}", + entry_model=LogEntry, +) + +_STREAM_SERVER_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/run/{step}/server/{server}", + entry_model=LogEntry, +) + +_STREAM_REPLICA_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/run/{step}/replica/{replica}", + entry_model=LogEntry, +) diff --git a/src/codesphere/resources/workspace/logs/schemas.py b/src/codesphere/resources/workspace/logs/schemas.py index 20c581c..f54eb61 100644 --- a/src/codesphere/resources/workspace/logs/schemas.py +++ b/src/codesphere/resources/workspace/logs/schemas.py @@ -7,30 +7,23 @@ class LogStage(str, Enum): - """Pipeline stage for log retrieval.""" - PREPARE = "prepare" TEST = "test" RUN = "run" class LogEntry(CamelModel): - """A single log entry from the workspace logs stream.""" - model_config = {"extra": "allow"} timestamp: Optional[str] = None - kind: Optional[str] = None # "I" for info, "E" for error, etc. + kind: Optional[str] = None # "I" for info, "E" for error data: Optional[str] = None # The actual log content def get_text(self) -> str: - """Get the log text.""" return self.data or "" class LogProblem(CamelModel): - """Problem event from the logs SSE stream.""" - status: int reason: str detail: Optional[str] = None diff --git a/tests/core/test_operations.py b/tests/core/test_operations.py index 8d891f0..2d13d07 100644 --- a/tests/core/test_operations.py +++ b/tests/core/test_operations.py @@ -1,10 +1,11 @@ -import pytest from dataclasses import dataclass from typing import Optional, Type +import pytest from pydantic import BaseModel -from codesphere.core.operations import APIOperation +from codesphere.core.operations import APIOperation, StreamOperation +from codesphere.resources.workspace.logs import LogEntry class SampleInputModel(BaseModel): @@ -121,3 +122,46 @@ def test_default_input_model_is_none(self): ) assert operation.input_model is None + + def test_create_api_operation(self): + op = APIOperation( + method="GET", + endpoint_template="/test/{id}", + response_model=LogEntry, + ) + assert op.method == "GET" + assert op.endpoint_template == "/test/{id}" + assert op.response_model == LogEntry + + def test_api_operation_is_frozen(self): + op = APIOperation( + method="GET", + endpoint_template="/test", + response_model=LogEntry, + ) + try: + op.method = "POST" + assert False, "Should raise error" + except Exception: + pass + + +class TestStreamOperation: + def test_create_stream_operation(self): + op = StreamOperation( + endpoint_template="/logs/{id}", + entry_model=LogEntry, + ) + assert op.endpoint_template == "/logs/{id}" + assert op.entry_model == LogEntry + + def test_stream_operation_is_frozen(self): + op = StreamOperation( + endpoint_template="/logs/{id}", + entry_model=LogEntry, + ) + try: + op.endpoint_template = "/other" + assert False, "Should raise error" + except Exception: + pass diff --git a/tests/resources/workspace/logs/__init__.py b/tests/resources/workspace/logs/__init__.py index 5e95095..e69de29 100644 --- a/tests/resources/workspace/logs/__init__.py +++ b/tests/resources/workspace/logs/__init__.py @@ -1 +0,0 @@ -# Tests for workspace logs module diff --git a/tests/resources/workspace/logs/test_logs.py b/tests/resources/workspace/logs/test_logs.py index 1d70588..44ae58e 100644 --- a/tests/resources/workspace/logs/test_logs.py +++ b/tests/resources/workspace/logs/test_logs.py @@ -9,8 +9,14 @@ LogEntry, LogProblem, LogStage, + LogStream, WorkspaceLogManager, ) +from codesphere.resources.workspace.logs.operations import ( + _STREAM_REPLICA_LOGS_OP, + _STREAM_SERVER_LOGS_OP, + _STREAM_STAGE_LOGS_OP, +) class TestLogStage: @@ -68,205 +74,140 @@ def test_create_problem_with_detail(self): assert problem.detail == "Workspace 123 does not exist" -async def mock_stream_logs_factory(entries: list[LogEntry], capture_endpoint: list): - """Factory to create a mock _stream_logs that captures the endpoint.""" - - async def mock_stream_logs(endpoint: str): - capture_endpoint.append(endpoint) - for entry in entries: - yield entry - - return mock_stream_logs - - -class TestWorkspaceLogManager: - @pytest.fixture - def mock_http_client(self): - client = MagicMock() - client._get_client = MagicMock() - return client - - @pytest.fixture - def log_manager(self, mock_http_client): - return WorkspaceLogManager(mock_http_client, workspace_id=123) - - def test_init(self, log_manager, mock_http_client): - assert log_manager._http_client == mock_http_client - assert log_manager._workspace_id == 123 - - @pytest.mark.asyncio - async def test_stream_builds_correct_endpoint(self, log_manager): - """Test that stream() builds the correct endpoint URL.""" - captured_endpoints: list[str] = [] - - async def mock_stream_logs(endpoint: str, timeout: float = None): - captured_endpoints.append(endpoint) - return - yield # Make it a generator - - log_manager._stream_logs = mock_stream_logs - - async for _ in log_manager.stream(stage=LogStage.PREPARE, step=1): - pass - - assert captured_endpoints == ["/workspaces/123/logs/prepare/1"] - - @pytest.mark.asyncio - async def test_stream_with_string_stage(self, log_manager): - """Test that stream() accepts string stage values.""" - captured_endpoints: list[str] = [] - - async def mock_stream_logs(endpoint: str, timeout: float = None): - captured_endpoints.append(endpoint) - return - yield +class TestStreamOperations: + def test_stream_stage_logs_op(self): + assert ( + _STREAM_STAGE_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/{stage}/{step}" + ) + assert _STREAM_STAGE_LOGS_OP.entry_model == LogEntry - log_manager._stream_logs = mock_stream_logs + def test_stream_server_logs_op(self): + assert ( + _STREAM_SERVER_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/run/{step}/server/{server}" + ) + assert _STREAM_SERVER_LOGS_OP.entry_model == LogEntry - async for _ in log_manager.stream(stage="test", step=2): - pass + def test_stream_replica_logs_op(self): + assert ( + _STREAM_REPLICA_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/run/{step}/replica/{replica}" + ) + assert _STREAM_REPLICA_LOGS_OP.entry_model == LogEntry - assert captured_endpoints == ["/workspaces/123/logs/test/2"] - @pytest.mark.asyncio - async def test_stream_server_builds_correct_endpoint(self, log_manager): - """Test that stream_server() builds the correct endpoint URL.""" - captured_endpoints: list[str] = [] +class TestLogStream: + def test_init(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test/endpoint", LogEntry, timeout=30.0) + assert stream._client == mock_client + assert stream._endpoint == "/test/endpoint" + assert stream._entry_model == LogEntry + assert stream._timeout == 30.0 - async def mock_stream_logs(endpoint: str, timeout: float = None): - captured_endpoints.append(endpoint) - return - yield + def test_init_no_timeout(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test/endpoint", LogEntry) + assert stream._timeout is None - log_manager._stream_logs = mock_stream_logs + def test_handle_problem_validation_error(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - async for _ in log_manager.stream_server(step=1, server="web"): - pass + with pytest.raises(ValidationError): + stream._handle_problem('{"status": 400, "reason": "Bad request"}') - assert captured_endpoints == ["/workspaces/123/logs/run/1/server/web"] + def test_handle_problem_api_error(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - @pytest.mark.asyncio - async def test_stream_replica_builds_correct_endpoint(self, log_manager): - """Test that stream_replica() builds the correct endpoint URL.""" - captured_endpoints: list[str] = [] + with pytest.raises(APIError) as exc_info: + stream._handle_problem('{"status": 404, "reason": "Not found"}') + assert exc_info.value.status_code == 404 - async def mock_stream_logs(endpoint: str, timeout: float = None): - captured_endpoints.append(endpoint) - return - yield + def test_handle_problem_invalid_json(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - log_manager._stream_logs = mock_stream_logs + with pytest.raises(APIError) as exc_info: + stream._handle_problem("invalid json") + assert "Invalid problem event" in str(exc_info.value) - async for _ in log_manager.stream_replica(step=2, replica="replica-1"): - pass + def test_parse_data_single_entry(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - assert captured_endpoints == ["/workspaces/123/logs/run/2/replica/replica-1"] + entries = stream._parse_data('{"data": "test log", "kind": "I"}') + assert len(entries) == 1 + assert entries[0].data == "test log" + assert entries[0].kind == "I" - @pytest.mark.asyncio - async def test_collect_returns_list(self, log_manager): - """Test that collect() returns a list of log entries.""" - entries = [ - LogEntry(data="Log 1"), - LogEntry(data="Log 2"), - LogEntry(data="Log 3"), - ] + def test_parse_data_array(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - async def mock_stream_logs(endpoint: str, timeout: float = None): - for entry in entries: - yield entry + entries = stream._parse_data('[{"data": "log1"}, {"data": "log2"}]') + assert len(entries) == 2 + assert entries[0].data == "log1" + assert entries[1].data == "log2" - log_manager._stream_logs = mock_stream_logs + def test_parse_data_invalid_json(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) - result = await log_manager.collect(stage=LogStage.PREPARE, step=1) + entries = stream._parse_data("invalid") + assert len(entries) == 0 - assert len(result) == 3 - assert result[0].data == "Log 1" - assert result[2].data == "Log 3" - @pytest.mark.asyncio - async def test_collect_with_max_entries(self, log_manager): - """Test that collect() respects max_entries limit.""" - entries = [ - LogEntry(data="Log 1"), - LogEntry(data="Log 2"), - LogEntry(data="Log 3"), - ] +class TestWorkspaceLogManager: + @pytest.fixture + def mock_http_client(self): + client = MagicMock() + client._get_client = MagicMock() + return client - async def mock_stream_logs(endpoint: str, timeout: float = None): - for entry in entries: - yield entry + @pytest.fixture + def log_manager(self, mock_http_client): + return WorkspaceLogManager(mock_http_client, workspace_id=123) - log_manager._stream_logs = mock_stream_logs + def test_init(self, log_manager, mock_http_client): + assert log_manager._http_client == mock_http_client + assert log_manager._workspace_id == 123 + assert log_manager.id == 123 - result = await log_manager.collect( - stage=LogStage.PREPARE, step=1, max_entries=2 + def test_build_endpoint(self, log_manager): + endpoint = log_manager._build_endpoint( + _STREAM_STAGE_LOGS_OP, stage="prepare", step=0 ) + assert endpoint == "/workspaces/123/logs/prepare/0" - assert len(result) == 2 - - @pytest.mark.asyncio - async def test_collect_server_returns_list(self, log_manager): - """Test that collect_server() returns a list of log entries.""" - entries = [LogEntry(data="Server log 1")] - - async def mock_stream_logs(endpoint: str, timeout: float = None): - for entry in entries: - yield entry - - log_manager._stream_logs = mock_stream_logs - - result = await log_manager.collect_server(step=1, server="web") - - assert len(result) == 1 - assert result[0].data == "Server log 1" - - @pytest.mark.asyncio - async def test_collect_replica_returns_list(self, log_manager): - """Test that collect_replica() returns a list of log entries.""" - entries = [LogEntry(data="Replica log 1")] - - async def mock_stream_logs(endpoint: str, timeout: float = None): - for entry in entries: - yield entry - - log_manager._stream_logs = mock_stream_logs - - result = await log_manager.collect_replica(step=1, replica="replica-1") - - assert len(result) == 1 - assert result[0].data == "Replica log 1" - - @pytest.mark.asyncio - async def test_process_sse_event_raises_on_problem(self, log_manager): - """Test that problem events raise appropriate exceptions.""" - problem_data = '{"status": 400, "reason": "Workspace is not running"}' - - with pytest.raises(ValidationError) as exc_info: - await log_manager._process_sse_event("problem", problem_data) - - assert "Workspace is not running" in str(exc_info.value) - - @pytest.mark.asyncio - async def test_process_sse_event_raises_api_error_for_non_400(self, log_manager): - """Test that non-400 problem events raise APIError.""" - problem_data = '{"status": 404, "reason": "Workspace not found"}' - - with pytest.raises(APIError) as exc_info: - await log_manager._process_sse_event("problem", problem_data) - - assert exc_info.value.status_code == 404 - - @pytest.mark.asyncio - async def test_process_sse_event_data_does_not_raise(self, log_manager): - """Test that data events do not raise exceptions.""" - data = '{"data": "Test log"}' - # Should not raise - await log_manager._process_sse_event("data", data) - - @pytest.mark.asyncio - async def test_process_sse_event_invalid_json_raises(self, log_manager): - """Test that invalid JSON in problem events raises APIError.""" - with pytest.raises(APIError) as exc_info: - await log_manager._process_sse_event("problem", "invalid json") - - assert "Invalid problem event" in str(exc_info.value) + def test_build_endpoint_server(self, log_manager): + endpoint = log_manager._build_endpoint( + _STREAM_SERVER_LOGS_OP, step=0, server="web" + ) + assert endpoint == "/workspaces/123/logs/run/0/server/web" + + def test_open_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_stream(stage=LogStage.PREPARE, step=0) + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/prepare/0" + assert stream._entry_model == LogEntry + + def test_open_stream_with_string_stage(self, log_manager): + stream = log_manager.open_stream(stage="test", step=1) + assert stream._endpoint == "/workspaces/123/logs/test/1" + + def test_open_stream_with_timeout(self, log_manager): + stream = log_manager.open_stream(stage="run", step=0, timeout=60.0) + assert stream._timeout == 60.0 + + def test_open_server_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_server_stream(step=0, server="web") + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/run/0/server/web" + + def test_open_replica_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_replica_stream(step=0, replica="replica-1") + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/run/0/replica/replica-1"