diff --git a/examples/create_workspace_with_landscape.py b/examples/create_workspace_with_landscape.py index a85cf28..ff020fb 100644 --- a/examples/create_workspace_with_landscape.py +++ b/examples/create_workspace_with_landscape.py @@ -3,59 +3,92 @@ from codesphere import CodesphereSDK from codesphere.resources.workspace import WorkspaceCreate -from codesphere.resources.workspace.landscape import ProfileBuilder, ProfileConfig +from codesphere.resources.workspace.landscape import ( + PipelineStage, + PipelineState, + ProfileBuilder, +) +from codesphere.resources.workspace.logs import LogStage -TEAM_ID = 123 # Replace with your actual team ID +TEAM_ID = 123 -async def get_plan_id(sdk: CodesphereSDK, plan_name: str = "Micro") -> int: - plans = await sdk.metadata.list_plans() - plan = next((p for p in plans if p.title == plan_name and not p.deprecated), None) - if not plan: - raise ValueError(f"Plan '{plan_name}' not found") - return plan.id +async def main(): + async with CodesphereSDK() as sdk: + plans = await sdk.metadata.list_plans() + plan = next((p for p in plans if p.title == "Micro" and not p.deprecated), None) + if not plan: + raise ValueError("Micro plan not found") + + workspace_name = f"pipeline-demo-{int(time.time())}" + + print(f"Creating workspace '{workspace_name}'...") + workspace = await sdk.workspaces.create( + WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name) + ) + print(f"✓ Workspace created (ID: {workspace.id})") + + print("Waiting for workspace to start...") + await workspace.wait_until_running(timeout=300.0, poll_interval=5.0) + print("✓ Workspace is running\n") + profile = ( + ProfileBuilder() + .prepare() + .add_step("echo 'Installing dependencies...' && sleep 2") + .add_step("echo 'Setup complete!' && sleep 1") + .done() + .add_reactive_service("web") + .plan(plan.id) + .add_step( + 'for i in $(seq 1 20); do echo "[$i] Processing request..."; sleep 1; done' + ) + .add_port(3000, public=True) + .add_path("/", port=3000) + .replicas(1) + .done() + .build() + ) -def build_web_profile(plan_id: int) -> ProfileConfig: - """Build a simple web service landscape profile.""" - return ( - ProfileBuilder() - .prepare() - .add_step("npm install", name="Install dependencies") - .done() - .add_reactive_service("web") - .plan(plan_id) - .add_step("npm start") - .add_port(3000, public=True) - .add_path("/", port=3000) - .replicas(1) - .env("NODE_ENV", "production") - .build() - ) + print("Deploying landscape profile...") + await workspace.landscape.save_profile("production", profile) + await workspace.landscape.deploy(profile="production") + print("✓ Profile deployed\n") + print("--- Prepare Stage ---") + await workspace.landscape.start_stage( + PipelineStage.PREPARE, profile="production" + ) + prepare_status = await workspace.landscape.wait_for_stage( + PipelineStage.PREPARE, timeout=60.0 + ) -async def create_workspace(sdk: CodesphereSDK, plan_id: int, name: str): - workspace = await sdk.workspaces.create( - WorkspaceCreate(plan_id=plan_id, team_id=TEAM_ID, name=name) - ) - await workspace.wait_until_running(timeout=300.0, poll_interval=5.0) - return workspace + for status in prepare_status: + icon = "✓" if status.state == PipelineState.SUCCESS else "✗" + print(f"{icon} {status.server}: {status.state.value}") + print("\nPrepare logs:") + for step in range(len(prepare_status[0].steps)): + logs = await workspace.logs.collect( + stage=LogStage.PREPARE, step=step, timeout=5.0 + ) + for entry in logs: + if entry.get_text(): + print(f" {entry.get_text().strip()}") -async def deploy_landscape(workspace, profile: dict, profile_name: str = "production"): - await workspace.landscape.save_profile(profile_name, profile) - await workspace.landscape.deploy(profile=profile_name) - print("Deployment started!") + print("\n--- Run Stage ---") + await workspace.landscape.start_stage(PipelineStage.RUN, profile="production") + print("Started run stage\n") + print("Streaming logs from 'web' service (using context manager):") + count = 0 + async with workspace.logs.open_server_stream(step=0, server="web") as stream: + async for entry in stream: + if entry.get_text(): + print(f" {entry.get_text().strip()}") + count += 1 -async def main(): - async with CodesphereSDK() as sdk: - plan_id = await get_plan_id(sdk) - workspace = await create_workspace( - sdk, plan_id, f"landscape-demo-{int(time.time())}" - ) - profile = build_web_profile(plan_id) - await deploy_landscape(workspace, profile) + print(f"\n✓ Stream ended ({count} log entries)") if __name__ == "__main__": diff --git a/src/codesphere/core/__init__.py b/src/codesphere/core/__init__.py index 1df8f8f..f92ed07 100644 --- a/src/codesphere/core/__init__.py +++ b/src/codesphere/core/__init__.py @@ -1,11 +1,13 @@ -from .base import ResourceBase -from .operations import APIOperation, AsyncCallable -from .handler import _APIOperationExecutor, APIRequestHandler +from .base import CamelModel, ResourceBase +from .handler import APIRequestHandler, _APIOperationExecutor +from .operations import APIOperation, AsyncCallable, StreamOperation __all__ = [ + "CamelModel", "ResourceBase", "APIOperation", "_APIOperationExecutor", "APIRequestHandler", "AsyncCallable", + "StreamOperation", ] diff --git a/src/codesphere/core/operations.py b/src/codesphere/core/operations.py index bf8ee3f..a6192f7 100644 --- a/src/codesphere/core/operations.py +++ b/src/codesphere/core/operations.py @@ -1,17 +1,25 @@ -from typing import Callable, Awaitable, Generic, Optional, Type, TypeAlias, TypeVar - -from pydantic import BaseModel +from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar +from pydantic import BaseModel, ConfigDict _T = TypeVar("_T") ResponseT = TypeVar("ResponseT") InputT = TypeVar("InputT") +EntryT = TypeVar("EntryT") AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]] class APIOperation(BaseModel, Generic[ResponseT, InputT]): + model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True) + method: str endpoint_template: str response_model: Type[ResponseT] input_model: Optional[Type[InputT]] = None + + +class StreamOperation(BaseModel, Generic[EntryT]): + model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True) + endpoint_template: str + entry_model: Type[EntryT] diff --git a/src/codesphere/resources/workspace/__init__.py b/src/codesphere/resources/workspace/__init__.py index d80e4f1..4045de8 100644 --- a/src/codesphere/resources/workspace/__init__.py +++ b/src/codesphere/resources/workspace/__init__.py @@ -1,4 +1,5 @@ from .git import GitHead, WorkspaceGitManager +from .logs import LogEntry, LogProblem, LogStage, LogStream, WorkspaceLogManager from .resources import WorkspacesResource from .schemas import ( CommandInput, @@ -19,4 +20,9 @@ "CommandOutput", "WorkspaceGitManager", "GitHead", + "LogStream", + "WorkspaceLogManager", + "LogEntry", + "LogProblem", + "LogStage", ] diff --git a/src/codesphere/resources/workspace/landscape/__init__.py b/src/codesphere/resources/workspace/landscape/__init__.py index 67950be..4facd41 100644 --- a/src/codesphere/resources/workspace/landscape/__init__.py +++ b/src/codesphere/resources/workspace/landscape/__init__.py @@ -4,6 +4,10 @@ ManagedServiceConfig, NetworkConfig, PathConfig, + PipelineStage, + PipelineState, + PipelineStatus, + PipelineStatusList, PortConfig, Profile, ProfileBuilder, @@ -12,6 +16,7 @@ ReactiveServiceConfig, StageConfig, Step, + StepStatus, ) __all__ = [ @@ -28,4 +33,9 @@ "NetworkConfig", "PortConfig", "PathConfig", + "PipelineStage", + "PipelineState", + "PipelineStatus", + "PipelineStatusList", + "StepStatus", ] diff --git a/src/codesphere/resources/workspace/landscape/models.py b/src/codesphere/resources/workspace/landscape/models.py index 8d6fc11..99e9bdc 100644 --- a/src/codesphere/resources/workspace/landscape/models.py +++ b/src/codesphere/resources/workspace/landscape/models.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging import re from typing import TYPE_CHECKING, Dict, List, Optional, Union @@ -10,10 +11,20 @@ from .operations import ( _DEPLOY_OP, _DEPLOY_WITH_PROFILE_OP, + _GET_PIPELINE_STATUS_OP, _SCALE_OP, + _START_PIPELINE_STAGE_OP, + _START_PIPELINE_STAGE_WITH_PROFILE_OP, + _STOP_PIPELINE_STAGE_OP, _TEARDOWN_OP, ) -from .schemas import Profile, ProfileConfig +from .schemas import ( + PipelineStage, + PipelineState, + PipelineStatusList, + Profile, + ProfileConfig, +) if TYPE_CHECKING: from ..schemas import CommandOutput @@ -95,3 +106,94 @@ async def teardown(self) -> None: async def scale(self, services: Dict[str, int]) -> None: await self._execute_operation(_SCALE_OP, data=services) + + async def start_stage( + self, + stage: Union[PipelineStage, str], + profile: Optional[str] = None, + ) -> None: + if isinstance(stage, PipelineStage): + stage = stage.value + + if profile is not None: + _validate_profile_name(profile) + await self._execute_operation( + _START_PIPELINE_STAGE_WITH_PROFILE_OP, stage=stage, profile=profile + ) + else: + await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage) + + async def stop_stage(self, stage: Union[PipelineStage, str]) -> None: + if isinstance(stage, PipelineStage): + stage = stage.value + + await self._execute_operation(_STOP_PIPELINE_STAGE_OP, stage=stage) + + async def get_stage_status( + self, stage: Union[PipelineStage, str] + ) -> PipelineStatusList: + if isinstance(stage, PipelineStage): + stage = stage.value + + return await self._execute_operation(_GET_PIPELINE_STATUS_OP, stage=stage) + + async def wait_for_stage( + self, + stage: Union[PipelineStage, str], + *, + timeout: float = 300.0, + poll_interval: float = 5.0, + server: Optional[str] = None, + ) -> PipelineStatusList: + if poll_interval <= 0: + raise ValueError("poll_interval must be greater than 0") + + stage_name = stage.value if isinstance(stage, PipelineStage) else stage + elapsed = 0.0 + + while elapsed < timeout: + status_list = await self.get_stage_status(stage) + + relevant_statuses = [] + for s in status_list: + if server is not None: + if s.server == server: + relevant_statuses.append(s) + else: + if s.steps: + relevant_statuses.append(s) + elif s.state != PipelineState.WAITING: + relevant_statuses.append(s) + + if not relevant_statuses: + log.debug( + "Pipeline stage '%s': no servers with steps yet, waiting...", + stage_name, + ) + await asyncio.sleep(poll_interval) + elapsed += poll_interval + continue + + all_completed = all( + s.state + in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED) + for s in relevant_statuses + ) + + if all_completed: + log.debug("Pipeline stage '%s' completed.", stage_name) + return PipelineStatusList(root=relevant_statuses) + + states = [f"{s.server}={s.state.value}" for s in relevant_statuses] + log.debug( + "Pipeline stage '%s' status: %s (elapsed: %.1fs)", + stage_name, + ", ".join(states), + elapsed, + ) + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + raise TimeoutError( + f"Pipeline stage '{stage_name}' did not complete within {timeout} seconds." + ) diff --git a/src/codesphere/resources/workspace/landscape/operations.py b/src/codesphere/resources/workspace/landscape/operations.py index 9197b09..9d2a53a 100644 --- a/src/codesphere/resources/workspace/landscape/operations.py +++ b/src/codesphere/resources/workspace/landscape/operations.py @@ -1,4 +1,5 @@ from ....core.operations import APIOperation +from .schemas import PipelineStatusList _DEPLOY_OP = APIOperation( method="POST", @@ -23,3 +24,27 @@ endpoint_template="/workspaces/{id}/landscape/scale", response_model=type(None), ) + +_START_PIPELINE_STAGE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/start", + response_model=type(None), +) + +_START_PIPELINE_STAGE_WITH_PROFILE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/start/{profile}", + response_model=type(None), +) + +_STOP_PIPELINE_STAGE_OP = APIOperation( + method="POST", + endpoint_template="/workspaces/{id}/pipeline/{stage}/stop", + response_model=type(None), +) + +_GET_PIPELINE_STATUS_OP = APIOperation( + method="GET", + endpoint_template="/workspaces/{id}/pipeline/{stage}", + response_model=PipelineStatusList, +) diff --git a/src/codesphere/resources/workspace/landscape/schemas.py b/src/codesphere/resources/workspace/landscape/schemas.py index 621b719..9baa40d 100644 --- a/src/codesphere/resources/workspace/landscape/schemas.py +++ b/src/codesphere/resources/workspace/landscape/schemas.py @@ -1,13 +1,56 @@ from __future__ import annotations +from enum import Enum from typing import Any, Dict, List, Literal, Optional import yaml -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, RootModel from ....core.base import CamelModel +class PipelineStage(str, Enum): + PREPARE = "prepare" + TEST = "test" + RUN = "run" + + +class PipelineState(str, Enum): + WAITING = "waiting" + RUNNING = "running" + SUCCESS = "success" + FAILURE = "failure" + ABORTED = "aborted" + + +class StepStatus(CamelModel): + state: PipelineState + started_at: Optional[str] = None + finished_at: Optional[str] = None + + +class PipelineStatus(CamelModel): + state: PipelineState + started_at: Optional[str] = None + finished_at: Optional[str] = None + steps: List[StepStatus] = Field(default_factory=list) + replica: str + server: str + + +class PipelineStatusList(RootModel[List[PipelineStatus]]): + root: List[PipelineStatus] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, item): + return self.root[item] + + def __len__(self): + return len(self.root) + + class Profile(BaseModel): name: str diff --git a/src/codesphere/resources/workspace/logs/__init__.py b/src/codesphere/resources/workspace/logs/__init__.py new file mode 100644 index 0000000..27ce1d5 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/__init__.py @@ -0,0 +1,10 @@ +from .models import LogStream, WorkspaceLogManager +from .schemas import LogEntry, LogProblem, LogStage + +__all__ = [ + "LogStream", + "WorkspaceLogManager", + "LogEntry", + "LogProblem", + "LogStage", +] diff --git a/src/codesphere/resources/workspace/logs/models.py b/src/codesphere/resources/workspace/logs/models.py new file mode 100644 index 0000000..8f703ac --- /dev/null +++ b/src/codesphere/resources/workspace/logs/models.py @@ -0,0 +1,293 @@ +from __future__ import annotations + +import asyncio +import json +import logging +from typing import AsyncIterator, Optional, Type, Union + +import httpx + +from ....core.operations import StreamOperation +from ....exceptions import APIError, ValidationError, raise_for_status +from ....http_client import APIHttpClient +from .operations import ( + _STREAM_REPLICA_LOGS_OP, + _STREAM_SERVER_LOGS_OP, + _STREAM_STAGE_LOGS_OP, +) +from .schemas import LogEntry, LogProblem, LogStage + +log = logging.getLogger(__name__) + + +class LogStream: + """Async context manager for streaming logs via SSE.""" + + def __init__( + self, + client: httpx.AsyncClient, + endpoint: str, + entry_model: Type[LogEntry], + timeout: Optional[float] = None, + ): + self._client = client + self._endpoint = endpoint + self._entry_model = entry_model + self._timeout = timeout + self._response: Optional[httpx.Response] = None + self._stream_context = None + + async def __aenter__(self) -> LogStream: + headers = {"Accept": "text/event-stream"} + self._stream_context = self._client.stream( + "GET", + self._endpoint, + headers=headers, + timeout=httpx.Timeout(5.0, read=None), + ) + self._response = await self._stream_context.__aenter__() + + if not self._response.is_success: + await self._response.aread() + raise_for_status(self._response) + + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + if self._stream_context: + await self._stream_context.__aexit__(exc_type, exc_val, exc_tb) + + def __aiter__(self) -> AsyncIterator[LogEntry]: + return self._iterate() + + async def _iterate(self) -> AsyncIterator[LogEntry]: + if self._response is None: + raise RuntimeError("LogStream must be used as async context manager") + + if self._timeout is not None: + async with asyncio.timeout(self._timeout): + async for entry in self._parse_sse_stream(): + yield entry + else: + async for entry in self._parse_sse_stream(): + yield entry + + async def _parse_sse_stream(self) -> AsyncIterator[LogEntry]: + event_type: Optional[str] = None + data_buffer: list[str] = [] + + async for line in self._response.aiter_lines(): + line = line.strip() + + if not line: + if event_type and data_buffer: + data_str = "\n".join(data_buffer) + + if event_type in ("end", "close", "done", "complete"): + return + + if event_type == "problem": + self._handle_problem(data_str) + elif event_type == "data": + for entry in self._parse_data(data_str): + yield entry + + elif event_type in ("end", "close", "done", "complete"): + return + + event_type = None + data_buffer = [] + continue + + if line.startswith("event:"): + event_type = line[6:].strip() + elif line.startswith("data:"): + data_buffer.append(line[5:].strip()) + elif not line.startswith(":"): + if event_type: + data_buffer.append(line) + + def _parse_data(self, data_str: str) -> list[LogEntry]: + entries = [] + try: + json_data = json.loads(data_str) + if isinstance(json_data, list): + for item in json_data: + entries.append(self._entry_model.model_validate(item)) + else: + entries.append(self._entry_model.model_validate(json_data)) + except json.JSONDecodeError as e: + log.warning(f"Failed to parse log entry JSON: {e}") + except Exception as e: + log.warning(f"Failed to validate log entry: {e}") + return entries + + def _handle_problem(self, data_str: str) -> None: + try: + problem_data = json.loads(data_str) + problem = LogProblem.model_validate(problem_data) + + if problem.status == 400: + raise ValidationError( + message=problem.reason, + errors=[{"detail": problem.detail}] if problem.detail else None, + ) + else: + raise APIError( + message=problem.reason, + status_code=problem.status, + response_body=problem_data, + ) + except json.JSONDecodeError: + raise APIError(message=f"Invalid problem event: {data_str}") + + +class WorkspaceLogManager: + """Manager for streaming workspace logs via SSE.""" + + def __init__(self, http_client: APIHttpClient, workspace_id: int): + self._http_client = http_client + self._workspace_id = workspace_id + self.id = workspace_id + + def _build_endpoint(self, operation: StreamOperation, **kwargs) -> str: + return operation.endpoint_template.format(id=self._workspace_id, **kwargs) + + def _open_stream( + self, + operation: StreamOperation, + timeout: Optional[float] = None, + **kwargs, + ) -> LogStream: + endpoint = self._build_endpoint(operation, **kwargs) + return LogStream( + client=self._http_client._get_client(), + endpoint=endpoint, + entry_model=operation.entry_model, + timeout=timeout, + ) + + def open_stream( + self, + stage: Union[LogStage, str], + step: int, + timeout: Optional[float] = None, + ) -> LogStream: + """Open a log stream as an async context manager.""" + if isinstance(stage, LogStage): + stage = stage.value + return self._open_stream( + _STREAM_STAGE_LOGS_OP, timeout=timeout, stage=stage, step=step + ) + + def open_server_stream( + self, + step: int, + server: str, + timeout: Optional[float] = None, + ) -> LogStream: + """Open a server log stream as an async context manager.""" + return self._open_stream( + _STREAM_SERVER_LOGS_OP, timeout=timeout, step=step, server=server + ) + + def open_replica_stream( + self, + step: int, + replica: str, + timeout: Optional[float] = None, + ) -> LogStream: + """Open a replica log stream as an async context manager.""" + return self._open_stream( + _STREAM_REPLICA_LOGS_OP, timeout=timeout, step=step, replica=replica + ) + + async def stream( + self, + stage: Union[LogStage, str], + step: int, + timeout: Optional[float] = 30.0, + ) -> AsyncIterator[LogEntry]: + """Stream logs for a given stage and step.""" + async with self.open_stream(stage, step, timeout) as stream: + async for entry in stream: + yield entry + + async def stream_server( + self, + step: int, + server: str, + timeout: Optional[float] = None, + ) -> AsyncIterator[LogEntry]: + """Stream run logs for a specific server.""" + async with self.open_server_stream(step, server, timeout) as stream: + async for entry in stream: + yield entry + + async def stream_replica( + self, + step: int, + replica: str, + timeout: Optional[float] = None, + ) -> AsyncIterator[LogEntry]: + """Stream run logs for a specific replica.""" + async with self.open_replica_stream(step, replica, timeout) as stream: + async for entry in stream: + yield entry + + async def collect( + self, + stage: Union[LogStage, str], + step: int, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a stage and step into a list.""" + entries: list[LogEntry] = [] + try: + async with self.open_stream(stage, step, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break + except asyncio.TimeoutError: + pass + return entries + + async def collect_server( + self, + step: int, + server: str, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a server into a list.""" + entries: list[LogEntry] = [] + try: + async with self.open_server_stream(step, server, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break + except asyncio.TimeoutError: + pass + return entries + + async def collect_replica( + self, + step: int, + replica: str, + max_entries: Optional[int] = None, + timeout: Optional[float] = 30.0, + ) -> list[LogEntry]: + """Collect all logs for a replica into a list.""" + entries: list[LogEntry] = [] + try: + async with self.open_replica_stream(step, replica, timeout) as stream: + async for entry in stream: + entries.append(entry) + if max_entries and len(entries) >= max_entries: + break + except asyncio.TimeoutError: + pass + return entries diff --git a/src/codesphere/resources/workspace/logs/operations.py b/src/codesphere/resources/workspace/logs/operations.py new file mode 100644 index 0000000..12d0e74 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/operations.py @@ -0,0 +1,17 @@ +from ....core.operations import StreamOperation +from .schemas import LogEntry + +_STREAM_STAGE_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/{stage}/{step}", + entry_model=LogEntry, +) + +_STREAM_SERVER_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/run/{step}/server/{server}", + entry_model=LogEntry, +) + +_STREAM_REPLICA_LOGS_OP = StreamOperation( + endpoint_template="/workspaces/{id}/logs/run/{step}/replica/{replica}", + entry_model=LogEntry, +) diff --git a/src/codesphere/resources/workspace/logs/schemas.py b/src/codesphere/resources/workspace/logs/schemas.py new file mode 100644 index 0000000..f54eb61 --- /dev/null +++ b/src/codesphere/resources/workspace/logs/schemas.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from enum import Enum +from typing import Optional + +from ....core.base import CamelModel + + +class LogStage(str, Enum): + PREPARE = "prepare" + TEST = "test" + RUN = "run" + + +class LogEntry(CamelModel): + model_config = {"extra": "allow"} + + timestamp: Optional[str] = None + kind: Optional[str] = None # "I" for info, "E" for error + data: Optional[str] = None # The actual log content + + def get_text(self) -> str: + return self.data or "" + + +class LogProblem(CamelModel): + status: int + reason: str + detail: Optional[str] = None diff --git a/src/codesphere/resources/workspace/schemas.py b/src/codesphere/resources/workspace/schemas.py index 4d67a58..ded34ce 100644 --- a/src/codesphere/resources/workspace/schemas.py +++ b/src/codesphere/resources/workspace/schemas.py @@ -11,6 +11,7 @@ from .envVars import EnvVar, WorkspaceEnvVarManager from .git import WorkspaceGitManager from .landscape import WorkspaceLandscapeManager +from .logs import WorkspaceLogManager log = logging.getLogger(__name__) @@ -144,3 +145,23 @@ def git(self) -> WorkspaceGitManager: """Manager for git operations (head, pull).""" http_client = self.validate_http_client() return WorkspaceGitManager(http_client, workspace_id=self.id) + + @cached_property + def logs(self) -> WorkspaceLogManager: + """Manager for streaming workspace logs. + + Provides methods to stream or collect logs from pipeline stages + (prepare, test, run) and Multi Server Deployment servers/replicas. + + Example: + ```python + # Stream logs as they arrive + async for entry in workspace.logs.stream(stage="prepare", step=1): + print(entry.message) + + # Collect all logs at once + entries = await workspace.logs.collect(stage="test", step=1) + ``` + """ + http_client = self.validate_http_client() + return WorkspaceLogManager(http_client, workspace_id=self.id) diff --git a/tests/core/test_operations.py b/tests/core/test_operations.py index 8d891f0..2d13d07 100644 --- a/tests/core/test_operations.py +++ b/tests/core/test_operations.py @@ -1,10 +1,11 @@ -import pytest from dataclasses import dataclass from typing import Optional, Type +import pytest from pydantic import BaseModel -from codesphere.core.operations import APIOperation +from codesphere.core.operations import APIOperation, StreamOperation +from codesphere.resources.workspace.logs import LogEntry class SampleInputModel(BaseModel): @@ -121,3 +122,46 @@ def test_default_input_model_is_none(self): ) assert operation.input_model is None + + def test_create_api_operation(self): + op = APIOperation( + method="GET", + endpoint_template="/test/{id}", + response_model=LogEntry, + ) + assert op.method == "GET" + assert op.endpoint_template == "/test/{id}" + assert op.response_model == LogEntry + + def test_api_operation_is_frozen(self): + op = APIOperation( + method="GET", + endpoint_template="/test", + response_model=LogEntry, + ) + try: + op.method = "POST" + assert False, "Should raise error" + except Exception: + pass + + +class TestStreamOperation: + def test_create_stream_operation(self): + op = StreamOperation( + endpoint_template="/logs/{id}", + entry_model=LogEntry, + ) + assert op.endpoint_template == "/logs/{id}" + assert op.entry_model == LogEntry + + def test_stream_operation_is_frozen(self): + op = StreamOperation( + endpoint_template="/logs/{id}", + entry_model=LogEntry, + ) + try: + op.endpoint_template = "/other" + assert False, "Should raise error" + except Exception: + pass diff --git a/tests/resources/workspace/landscape/test_pipeline.py b/tests/resources/workspace/landscape/test_pipeline.py new file mode 100644 index 0000000..e7b5239 --- /dev/null +++ b/tests/resources/workspace/landscape/test_pipeline.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from codesphere.resources.workspace.landscape import ( + PipelineStage, + PipelineState, + PipelineStatus, + PipelineStatusList, + StepStatus, +) +from codesphere.resources.workspace.landscape.models import WorkspaceLandscapeManager + + +class TestPipelineSchemas: + def test_pipeline_stage_enum_values(self): + assert PipelineStage.PREPARE.value == "prepare" + assert PipelineStage.TEST.value == "test" + assert PipelineStage.RUN.value == "run" + + def test_pipeline_state_enum_values(self): + assert PipelineState.WAITING.value == "waiting" + assert PipelineState.RUNNING.value == "running" + assert PipelineState.SUCCESS.value == "success" + assert PipelineState.FAILURE.value == "failure" + assert PipelineState.ABORTED.value == "aborted" + + def test_step_status_create(self): + status = StepStatus(state=PipelineState.RUNNING) + assert status.state == PipelineState.RUNNING + assert status.started_at is None + assert status.finished_at is None + + def test_step_status_with_timestamps(self): + status = StepStatus( + state=PipelineState.SUCCESS, + started_at="2026-02-10T10:00:00Z", + finished_at="2026-02-10T10:05:00Z", + ) + assert status.state == PipelineState.SUCCESS + assert status.started_at == "2026-02-10T10:00:00Z" + assert status.finished_at == "2026-02-10T10:05:00Z" + + def test_pipeline_status_create(self): + status = PipelineStatus( + state=PipelineState.RUNNING, + steps=[StepStatus(state=PipelineState.SUCCESS)], + replica="replica-1", + server="web", + ) + assert status.state == PipelineState.RUNNING + assert len(status.steps) == 1 + assert status.replica == "replica-1" + assert status.server == "web" + + def test_pipeline_status_list(self): + statuses = [ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ), + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-2", + server="web", + ), + ] + status_list = PipelineStatusList(root=statuses) + + assert len(status_list) == 2 + assert status_list[0].replica == "replica-1" + assert status_list[1].state == PipelineState.RUNNING + + +class TestWorkspaceLandscapeManagerPipeline: + @pytest.fixture + def mock_http_client(self): + client = MagicMock() + client._get_client = MagicMock() + return client + + @pytest.fixture + def landscape_manager(self, mock_http_client): + return WorkspaceLandscapeManager(mock_http_client, workspace_id=123) + + @pytest.mark.asyncio + async def test_start_stage_with_enum(self, landscape_manager): + """Test start_stage with PipelineStage enum.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage(PipelineStage.PREPARE) + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "prepare" + + @pytest.mark.asyncio + async def test_start_stage_with_string(self, landscape_manager): + """Test start_stage with string stage.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage("run") + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + + @pytest.mark.asyncio + async def test_start_stage_with_profile(self, landscape_manager): + """Test start_stage with a profile name.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.start_stage(PipelineStage.RUN, profile="production") + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + assert call_args.kwargs["profile"] == "production" + + @pytest.mark.asyncio + async def test_start_stage_invalid_profile_name(self, landscape_manager): + """Test start_stage with invalid profile name raises ValueError.""" + with pytest.raises(ValueError, match="Invalid profile name"): + await landscape_manager.start_stage( + PipelineStage.RUN, profile="invalid profile!" + ) + + @pytest.mark.asyncio + async def test_stop_stage(self, landscape_manager): + """Test stop_stage.""" + landscape_manager._execute_operation = AsyncMock() + + await landscape_manager.stop_stage(PipelineStage.RUN) + + landscape_manager._execute_operation.assert_called_once() + call_args = landscape_manager._execute_operation.call_args + assert call_args.kwargs["stage"] == "run" + + @pytest.mark.asyncio + async def test_get_stage_status(self, landscape_manager): + """Test get_stage_status returns PipelineStatusList.""" + mock_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=mock_status) + + result = await landscape_manager.get_stage_status(PipelineStage.RUN) + + assert len(result) == 1 + assert result[0].state == PipelineState.RUNNING + + @pytest.mark.asyncio + async def test_wait_for_stage_completes_immediately(self, landscape_manager): + """Test wait_for_stage when stage is already complete.""" + mock_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=mock_status) + + result = await landscape_manager.wait_for_stage(PipelineStage.PREPARE) + + assert len(result) == 1 + assert result[0].state == PipelineState.SUCCESS + + @pytest.mark.asyncio + async def test_wait_for_stage_polls_until_complete(self, landscape_manager): + """Test wait_for_stage polls until stage completes.""" + running_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + success_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.SUCCESS, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + + landscape_manager._execute_operation = AsyncMock( + side_effect=[running_status, running_status, success_status] + ) + + result = await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, poll_interval=0.01 + ) + + assert result[0].state == PipelineState.SUCCESS + assert landscape_manager._execute_operation.call_count == 3 + + @pytest.mark.asyncio + async def test_wait_for_stage_timeout(self, landscape_manager): + """Test wait_for_stage raises TimeoutError.""" + running_status = PipelineStatusList( + root=[ + PipelineStatus( + state=PipelineState.RUNNING, + steps=[], + replica="replica-1", + server="web", + ) + ] + ) + landscape_manager._execute_operation = AsyncMock(return_value=running_status) + + with pytest.raises(TimeoutError, match="did not complete"): + await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, timeout=0.05, poll_interval=0.01 + ) + + @pytest.mark.asyncio + async def test_wait_for_stage_invalid_poll_interval(self, landscape_manager): + """Test wait_for_stage with invalid poll_interval raises ValueError.""" + with pytest.raises(ValueError, match="poll_interval must be greater than 0"): + await landscape_manager.wait_for_stage( + PipelineStage.PREPARE, poll_interval=0 + ) diff --git a/tests/resources/workspace/logs/__init__.py b/tests/resources/workspace/logs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/workspace/logs/test_logs.py b/tests/resources/workspace/logs/test_logs.py new file mode 100644 index 0000000..44ae58e --- /dev/null +++ b/tests/resources/workspace/logs/test_logs.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from codesphere.exceptions import APIError, ValidationError +from codesphere.resources.workspace.logs import ( + LogEntry, + LogProblem, + LogStage, + LogStream, + WorkspaceLogManager, +) +from codesphere.resources.workspace.logs.operations import ( + _STREAM_REPLICA_LOGS_OP, + _STREAM_SERVER_LOGS_OP, + _STREAM_STAGE_LOGS_OP, +) + + +class TestLogStage: + def test_enum_values(self): + assert LogStage.PREPARE.value == "prepare" + assert LogStage.TEST.value == "test" + assert LogStage.RUN.value == "run" + + +class TestLogEntry: + def test_create_with_data_only(self): + entry = LogEntry(data="Test log message") + assert entry.data == "Test log message" + assert entry.timestamp is None + assert entry.kind is None + + def test_create_with_all_fields(self): + entry = LogEntry( + data="Test message", + timestamp="2026-02-10T12:00:00Z", + kind="I", + ) + assert entry.data == "Test message" + assert entry.timestamp == "2026-02-10T12:00:00Z" + assert entry.kind == "I" + + def test_from_dict(self): + data = {"data": "Hello", "timestamp": "2026-02-10T12:00:00Z"} + entry = LogEntry.model_validate(data) + assert entry.data == "Hello" + + def test_get_text(self): + entry = LogEntry(data="Log content") + assert entry.get_text() == "Log content" + + def test_get_text_empty(self): + entry = LogEntry() + assert entry.get_text() == "" + + +class TestLogProblem: + def test_create_problem(self): + problem = LogProblem(status=400, reason="Workspace is not running") + assert problem.status == 400 + assert problem.reason == "Workspace is not running" + assert problem.detail is None + + def test_create_problem_with_detail(self): + problem = LogProblem( + status=404, + reason="Not found", + detail="Workspace 123 does not exist", + ) + assert problem.status == 404 + assert problem.detail == "Workspace 123 does not exist" + + +class TestStreamOperations: + def test_stream_stage_logs_op(self): + assert ( + _STREAM_STAGE_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/{stage}/{step}" + ) + assert _STREAM_STAGE_LOGS_OP.entry_model == LogEntry + + def test_stream_server_logs_op(self): + assert ( + _STREAM_SERVER_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/run/{step}/server/{server}" + ) + assert _STREAM_SERVER_LOGS_OP.entry_model == LogEntry + + def test_stream_replica_logs_op(self): + assert ( + _STREAM_REPLICA_LOGS_OP.endpoint_template + == "/workspaces/{id}/logs/run/{step}/replica/{replica}" + ) + assert _STREAM_REPLICA_LOGS_OP.entry_model == LogEntry + + +class TestLogStream: + def test_init(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test/endpoint", LogEntry, timeout=30.0) + assert stream._client == mock_client + assert stream._endpoint == "/test/endpoint" + assert stream._entry_model == LogEntry + assert stream._timeout == 30.0 + + def test_init_no_timeout(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test/endpoint", LogEntry) + assert stream._timeout is None + + def test_handle_problem_validation_error(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + with pytest.raises(ValidationError): + stream._handle_problem('{"status": 400, "reason": "Bad request"}') + + def test_handle_problem_api_error(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + with pytest.raises(APIError) as exc_info: + stream._handle_problem('{"status": 404, "reason": "Not found"}') + assert exc_info.value.status_code == 404 + + def test_handle_problem_invalid_json(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + with pytest.raises(APIError) as exc_info: + stream._handle_problem("invalid json") + assert "Invalid problem event" in str(exc_info.value) + + def test_parse_data_single_entry(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + entries = stream._parse_data('{"data": "test log", "kind": "I"}') + assert len(entries) == 1 + assert entries[0].data == "test log" + assert entries[0].kind == "I" + + def test_parse_data_array(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + entries = stream._parse_data('[{"data": "log1"}, {"data": "log2"}]') + assert len(entries) == 2 + assert entries[0].data == "log1" + assert entries[1].data == "log2" + + def test_parse_data_invalid_json(self): + mock_client = MagicMock() + stream = LogStream(mock_client, "/test", LogEntry) + + entries = stream._parse_data("invalid") + assert len(entries) == 0 + + +class TestWorkspaceLogManager: + @pytest.fixture + def mock_http_client(self): + client = MagicMock() + client._get_client = MagicMock() + return client + + @pytest.fixture + def log_manager(self, mock_http_client): + return WorkspaceLogManager(mock_http_client, workspace_id=123) + + def test_init(self, log_manager, mock_http_client): + assert log_manager._http_client == mock_http_client + assert log_manager._workspace_id == 123 + assert log_manager.id == 123 + + def test_build_endpoint(self, log_manager): + endpoint = log_manager._build_endpoint( + _STREAM_STAGE_LOGS_OP, stage="prepare", step=0 + ) + assert endpoint == "/workspaces/123/logs/prepare/0" + + def test_build_endpoint_server(self, log_manager): + endpoint = log_manager._build_endpoint( + _STREAM_SERVER_LOGS_OP, step=0, server="web" + ) + assert endpoint == "/workspaces/123/logs/run/0/server/web" + + def test_open_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_stream(stage=LogStage.PREPARE, step=0) + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/prepare/0" + assert stream._entry_model == LogEntry + + def test_open_stream_with_string_stage(self, log_manager): + stream = log_manager.open_stream(stage="test", step=1) + assert stream._endpoint == "/workspaces/123/logs/test/1" + + def test_open_stream_with_timeout(self, log_manager): + stream = log_manager.open_stream(stage="run", step=0, timeout=60.0) + assert stream._timeout == 60.0 + + def test_open_server_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_server_stream(step=0, server="web") + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/run/0/server/web" + + def test_open_replica_stream_returns_log_stream(self, log_manager): + stream = log_manager.open_replica_stream(step=0, replica="replica-1") + assert isinstance(stream, LogStream) + assert stream._endpoint == "/workspaces/123/logs/run/0/replica/replica-1"