Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import time
from datetime import datetime, timedelta, timezone

from codesphere import CodesphereSDK
from codesphere.resources.workspace import WorkspaceCreate
Expand Down Expand Up @@ -28,6 +29,8 @@ async def main():
)
print(f"✓ Workspace created (ID: {workspace.id})")

team = await sdk.teams.get(team_id=TEAM_ID)

print("Waiting for workspace to start...")
await workspace.wait_until_running(timeout=300.0, poll_interval=5.0)
print("✓ Workspace is running\n")
Expand Down Expand Up @@ -90,6 +93,70 @@ async def main():

print(f"\n✓ Stream ended ({count} log entries)")

print("\n--- Usage History ---")

end_date = datetime.now(timezone.utc)
begin_date = end_date - timedelta(days=1)

print(
f"Fetching usage summary from {begin_date.isoformat()} to {end_date.isoformat()}..."
)
usage_summary = await team.usage.get_landscape_summary(
begin_date=begin_date,
end_date=end_date,
limit=50,
)

print(f"Total resources with usage: {usage_summary.total_items}")
print(f"Page {usage_summary.current_page} of {usage_summary.total_pages}")

if usage_summary.items:
print("\nResource Usage Summary:")
for item in usage_summary.items:
hours = item.usage_seconds / 3600
print(f" • {item.resource_name}")
print(f" - Plan: {item.plan_name}")
print(
f" - Usage: {hours:.2f} hours ({item.usage_seconds:.0f} seconds)"
)
print(f" - Replicas: {item.replicas}")
print(f" - Always On: {item.always_on}")
print()

first_resource = usage_summary.items[0]
print(f"Fetching events for '{first_resource.resource_name}'...")

events = await team.usage.get_landscape_events(
resource_id=first_resource.resource_id,
begin_date=begin_date,
end_date=end_date,
)

print(f"Total events: {events.total_items}")
for event in events.items:
print(
f" [{event.date.isoformat()}] {event.action.value.upper()} by {event.initiator_email}"
)

print("\nRefreshing usage summary...")
await usage_summary.refresh()
print(f"✓ Refreshed - still {usage_summary.total_items} items")
else:
print("No usage data found for the specified time range.")

print("\n--- Auto-Pagination Example ---")
print("Iterating through ALL usage summaries (auto-pagination):")
item_count = 0
async for item in team.usage.iter_all_landscape_summary(
begin_date=begin_date,
end_date=end_date,
page_size=25, # Fetch 25 at a time
):
item_count += 1
print(f" {item_count}. {item.resource_id}: {item.usage_seconds:.0f}s")

print(f"\n✓ Total items processed: {item_count}")


if __name__ == "__main__":
asyncio.run(main())
55 changes: 55 additions & 0 deletions examples/scripts/delete_all_workspaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import argparse
import asyncio

from codesphere import CodesphereSDK


async def delete_all_workspaces(team_id: int, dry_run: bool = False) -> None:
async with CodesphereSDK() as sdk:
print(f"Fetching workspaces for team {team_id}...")
workspaces = await sdk.workspaces.list(team_id=team_id)

if not workspaces:
print("No workspaces found in this team.")
return

print(f"Found {len(workspaces)} workspace(s):\n")
for ws in workspaces:
print(f" • {ws.name} (ID: {ws.id})")

if dry_run:
print("\n[DRY RUN] No workspaces were deleted.")
return

print("\n" + "=" * 50)
confirm = input(f"Delete all {len(workspaces)} workspaces? (yes/no): ")
if confirm.lower() != "yes":
print("Aborted.")
return

print("\nDeleting workspaces...")
for ws in workspaces:
try:
await ws.delete()
print(f" ✓ Deleted: {ws.name} (ID: {ws.id})")
except Exception as e:
print(f" ✗ Failed to delete {ws.name}: {e}")

print(f"\n✓ Done. Deleted {len(workspaces)} workspace(s).")


def main():
parser = argparse.ArgumentParser(description="Delete all workspaces in a team")
parser.add_argument("--team-id", type=int, required=True, help="Team ID")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting",
)
args = parser.parse_args()

asyncio.run(delete_all_workspaces(team_id=args.team_id, dry_run=args.dry_run))


if __name__ == "__main__":
main()
25 changes: 20 additions & 5 deletions src/codesphere/resources/team/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from .schemas import Team, TeamCreate, TeamBase
from .resources import TeamsResource
from .domain.resources import (
Domain,
CustomDomainConfig,
DomainVerificationStatus,
Domain,
DomainBase,
DomainRouting,
DomainVerificationStatus,
)
from .resources import TeamsResource
from .schemas import Team, TeamBase, TeamCreate
from .usage import (
LandscapeServiceEvent,
LandscapeServiceSummary,
PaginatedResponse,
ServiceAction,
TeamUsageManager,
UsageEventsResponse,
UsageSummaryResponse,
)


__all__ = [
"Team",
Expand All @@ -19,4 +27,11 @@
"DomainVerificationStatus",
"DomainBase",
"DomainRouting",
"TeamUsageManager",
"LandscapeServiceEvent",
"LandscapeServiceSummary",
"PaginatedResponse",
"ServiceAction",
"UsageEventsResponse",
"UsageSummaryResponse",
]
14 changes: 11 additions & 3 deletions src/codesphere/resources/team/schemas.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

from functools import cached_property
from pydantic import Field
from typing import Optional

from .domain.manager import TeamDomainManager
from pydantic import Field

from ...core import APIOperation, AsyncCallable, _APIOperationExecutor
from ...core.base import CamelModel
from ...core import _APIOperationExecutor, APIOperation, AsyncCallable
from .domain.manager import TeamDomainManager
from .usage.manager import TeamUsageManager


class TeamCreate(CamelModel):
Expand Down Expand Up @@ -39,3 +42,8 @@ class Team(TeamBase, _APIOperationExecutor):
def domains(self) -> TeamDomainManager:
http_client = self.validate_http_client()
return TeamDomainManager(http_client, team_id=self.id)

@cached_property
def usage(self) -> TeamUsageManager:
http_client = self.validate_http_client()
return TeamUsageManager(http_client, team_id=self.id)
Empty file.
21 changes: 21 additions & 0 deletions src/codesphere/resources/team/usage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Team usage history resources."""

from .manager import TeamUsageManager
from .schemas import (
LandscapeServiceEvent,
LandscapeServiceSummary,
PaginatedResponse,
ServiceAction,
UsageEventsResponse,
UsageSummaryResponse,
)

__all__ = [
"TeamUsageManager",
"LandscapeServiceEvent",
"LandscapeServiceSummary",
"PaginatedResponse",
"ServiceAction",
"UsageEventsResponse",
"UsageSummaryResponse",
]
168 changes: 168 additions & 0 deletions src/codesphere/resources/team/usage/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from __future__ import annotations

from datetime import datetime
from functools import partial
from typing import AsyncIterator, Union

from pydantic import Field

from ....core.handler import _APIOperationExecutor
from ....core.operations import AsyncCallable
from ....http_client import APIHttpClient
from .operations import _GET_LANDSCAPE_EVENTS_OP, _GET_LANDSCAPE_SUMMARY_OP
from .schemas import (
LandscapeServiceEvent,
LandscapeServiceSummary,
UsageEventsResponse,
UsageSummaryResponse,
)


class TeamUsageManager(_APIOperationExecutor):
"""Manager for team usage history operations.

Provides access to landscape service usage summaries and events with
support for pagination and automatic iteration through all results.

Example:
```python
summary = await team.usage.get_landscape_summary(
begin_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31),
limit=50
)
print(f"Total items: {summary.total_items}")
print(f"Page {summary.current_page} of {summary.total_pages}")

for item in summary.items:
print(f"{item.resource_name}: {item.usage_seconds}s")

await summary.refresh()

async for item in team.usage.iter_all_landscape_summary(
begin_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31)
):
print(f"{item.resource_name}: {item.usage_seconds}s")
```
"""

def __init__(self, http_client: APIHttpClient, team_id: int):
self._http_client = http_client
self.team_id = team_id

get_landscape_summary_op: AsyncCallable[UsageSummaryResponse] = Field(
default=_GET_LANDSCAPE_SUMMARY_OP, exclude=True
)

async def get_landscape_summary(
self,
begin_date: Union[datetime, str],
end_date: Union[datetime, str],
limit: int = 25,
offset: int = 0,
) -> UsageSummaryResponse:
params = {
"beginDate": begin_date.isoformat()
if isinstance(begin_date, datetime)
else begin_date,
"endDate": end_date.isoformat()
if isinstance(end_date, datetime)
else end_date,
"limit": min(max(1, limit), 100),
"offset": max(0, offset),
}
result: UsageSummaryResponse = await self.get_landscape_summary_op(
params=params
)

result._refresh_op = partial(self.get_landscape_summary_op)
result._team_id = self.team_id

return result

async def iter_all_landscape_summary(
self,
begin_date: Union[datetime, str],
end_date: Union[datetime, str],
page_size: int = 100,
) -> AsyncIterator[LandscapeServiceSummary]:
offset = 0
page_size = min(max(1, page_size), 100)

while True:
response = await self.get_landscape_summary(
begin_date=begin_date,
end_date=end_date,
limit=page_size,
offset=offset,
)

for item in response.items:
yield item

if not response.has_next_page:
break

offset += page_size

get_landscape_events_op: AsyncCallable[UsageEventsResponse] = Field(
default=_GET_LANDSCAPE_EVENTS_OP, exclude=True
)

async def get_landscape_events(
self,
resource_id: str,
begin_date: Union[datetime, str],
end_date: Union[datetime, str],
limit: int = 25,
offset: int = 0,
) -> UsageEventsResponse:
params = {
"beginDate": begin_date.isoformat()
if isinstance(begin_date, datetime)
else begin_date,
"endDate": end_date.isoformat()
if isinstance(end_date, datetime)
else end_date,
"limit": min(max(1, limit), 100),
"offset": max(0, offset),
}
result: UsageEventsResponse = await self.get_landscape_events_op(
resource_id=resource_id, params=params
)

result._refresh_op = partial(
self.get_landscape_events_op, resource_id=resource_id
)
result._team_id = self.team_id
result._resource_id = resource_id

return result

async def iter_all_landscape_events(
self,
resource_id: str,
begin_date: Union[datetime, str],
end_date: Union[datetime, str],
page_size: int = 100,
) -> AsyncIterator[LandscapeServiceEvent]:
offset = 0
page_size = min(max(1, page_size), 100)

while True:
response = await self.get_landscape_events(
resource_id=resource_id,
begin_date=begin_date,
end_date=end_date,
limit=page_size,
offset=offset,
)

for item in response.items:
yield item

if not response.has_next_page:
break

offset += page_size
Loading