Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
ADCPToolNotFoundError,
ADCPWebhookError,
ADCPWebhookSignatureError,
RegistryError,
)
from adcp.registry import RegistryClient

# Test helpers
from adcp.testing import (
Expand Down Expand Up @@ -203,7 +205,15 @@
ValidateContentDeliveryErrorResponse,
ValidateContentDeliverySuccessResponse,
)
from adcp.types.core import AgentConfig, Protocol, TaskResult, TaskStatus, WebhookMetadata
from adcp.types.core import (
AgentConfig,
Protocol,
ResolvedBrand,
ResolvedProperty,
TaskResult,
TaskStatus,
WebhookMetadata,
)
from adcp.utils import (
get_asset_count,
get_format_assets,
Expand Down Expand Up @@ -259,9 +269,12 @@ def get_adcp_version() -> str:
# Client classes
"ADCPClient",
"ADCPMultiAgentClient",
"RegistryClient",
# Core types
"AgentConfig",
"Protocol",
"ResolvedBrand",
"ResolvedProperty",
"TaskResult",
"TaskStatus",
"WebhookMetadata",
Expand Down Expand Up @@ -376,6 +389,7 @@ def get_adcp_version() -> str:
"AdagentsValidationError",
"AdagentsNotFoundError",
"AdagentsTimeoutError",
"RegistryError",
# Validation utilities
"ValidationError",
"validate_adagents",
Expand Down
10 changes: 10 additions & 0 deletions src/adcp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ def __init__(
super().__init__(message, agent_id, None, suggestion)


class RegistryError(ADCPError):
"""Error from AdCP registry API operations (brand/property lookups)."""

def __init__(self, message: str, status_code: int | None = None):
"""Initialize registry error."""
self.status_code = status_code
suggestion = "Check that the registry API is accessible and the domain is valid."
super().__init__(message, suggestion=suggestion)


class AdagentsValidationError(ADCPError):
"""Base error for adagents.json validation issues."""

Expand Down
293 changes: 293 additions & 0 deletions src/adcp/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
from __future__ import annotations

"""Client for the AdCP registry API (brand and property lookups)."""

import asyncio
from typing import Any

import httpx
from pydantic import ValidationError

from adcp.exceptions import RegistryError
from adcp.types.core import ResolvedBrand, ResolvedProperty

DEFAULT_REGISTRY_URL = "https://agenticadvertising.org"
MAX_BULK_DOMAINS = 100


class RegistryClient:
"""Client for the AdCP registry API.

Provides brand and property lookups against the central AdCP registry.

Args:
base_url: Registry API base URL.
timeout: Request timeout in seconds.
client: Optional httpx.AsyncClient for connection pooling.
If provided, caller is responsible for client lifecycle.
user_agent: User-Agent header for requests.
"""

def __init__(
self,
base_url: str = DEFAULT_REGISTRY_URL,
timeout: float = 10.0,
client: httpx.AsyncClient | None = None,
user_agent: str = "adcp-client-python",
):
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._external_client = client
self._owned_client: httpx.AsyncClient | None = None
self._user_agent = user_agent

async def _get_client(self) -> httpx.AsyncClient:
"""Get or create httpx client."""
if self._external_client is not None:
return self._external_client
if self._owned_client is None:
self._owned_client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
),
)
return self._owned_client

async def close(self) -> None:
"""Close owned HTTP client. No-op if using external client."""
if self._owned_client is not None:
await self._owned_client.aclose()
self._owned_client = None

async def __aenter__(self) -> RegistryClient:
return self

async def __aexit__(self, *args: Any) -> None:
await self.close()

async def lookup_brand(self, domain: str) -> ResolvedBrand | None:
"""Resolve a single domain to its canonical brand identity.

Args:
domain: Domain to resolve (e.g., "nike.com").

Returns:
ResolvedBrand if found, None if the domain is not in the registry.

Raises:
RegistryError: On HTTP or parsing errors.
"""
client = await self._get_client()
try:
response = await client.get(
f"{self._base_url}/api/brands/resolve",
params={"domain": domain},
headers={"User-Agent": self._user_agent},
timeout=self._timeout,
)
if response.status_code == 404:
return None
if response.status_code != 200:
raise RegistryError(
f"Brand lookup failed: HTTP {response.status_code}",
status_code=response.status_code,
)
data = response.json()
if data is None:
return None
return ResolvedBrand.model_validate(data)
except RegistryError:
raise
except httpx.TimeoutException as e:
raise RegistryError(f"Brand lookup timed out after {self._timeout}s") from e
except httpx.HTTPError as e:
raise RegistryError(f"Brand lookup failed: {e}") from e
except (ValidationError, ValueError) as e:
raise RegistryError(f"Brand lookup failed: invalid response: {e}") from e

async def lookup_brands(
self, domains: list[str]
) -> dict[str, ResolvedBrand | None]:
"""Bulk resolve domains to brand identities.

Automatically chunks requests exceeding 100 domains.

Args:
domains: List of domains to resolve.

Returns:
Dict mapping each domain to its ResolvedBrand, or None if not found.

Raises:
RegistryError: On HTTP or parsing errors.
"""
if not domains:
return {}

chunks = [
domains[i : i + MAX_BULK_DOMAINS]
for i in range(0, len(domains), MAX_BULK_DOMAINS)
]

chunk_results = await asyncio.gather(
*[self._lookup_brands_chunk(chunk) for chunk in chunks],
return_exceptions=True,
)

merged: dict[str, ResolvedBrand | None] = {}
for result in chunk_results:
if isinstance(result, BaseException):
raise result
merged.update(result)
return merged

async def _lookup_brands_chunk(
self, domains: list[str]
) -> dict[str, ResolvedBrand | None]:
"""Resolve a single chunk of brand domains (max 100)."""
client = await self._get_client()
try:
response = await client.post(
f"{self._base_url}/api/brands/resolve/bulk",
json={"domains": domains},
headers={"User-Agent": self._user_agent},
timeout=self._timeout,
)
if response.status_code != 200:
raise RegistryError(
f"Bulk brand lookup failed: HTTP {response.status_code}",
status_code=response.status_code,
)
data = response.json()
results_raw = data.get("results", {})
results: dict[str, ResolvedBrand | None] = {d: None for d in domains}
for domain, brand_data in results_raw.items():
if brand_data is not None:
results[domain] = ResolvedBrand.model_validate(brand_data)
return results
except RegistryError:
raise
except httpx.TimeoutException as e:
raise RegistryError(
f"Bulk brand lookup timed out after {self._timeout}s"
) from e
except httpx.HTTPError as e:
raise RegistryError(f"Bulk brand lookup failed: {e}") from e
except (ValidationError, ValueError) as e:
raise RegistryError(f"Bulk brand lookup failed: invalid response: {e}") from e

async def lookup_property(self, domain: str) -> ResolvedProperty | None:
"""Resolve a publisher domain to its property info.

Args:
domain: Publisher domain to resolve (e.g., "nytimes.com").

Returns:
ResolvedProperty if found, None if the domain is not in the registry.

Raises:
RegistryError: On HTTP or parsing errors.
"""
client = await self._get_client()
try:
response = await client.get(
f"{self._base_url}/api/properties/resolve",
params={"domain": domain},
headers={"User-Agent": self._user_agent},
timeout=self._timeout,
)
if response.status_code == 404:
return None
if response.status_code != 200:
raise RegistryError(
f"Property lookup failed: HTTP {response.status_code}",
status_code=response.status_code,
)
data = response.json()
if data is None:
return None
return ResolvedProperty.model_validate(data)
except RegistryError:
raise
except httpx.TimeoutException as e:
raise RegistryError(
f"Property lookup timed out after {self._timeout}s"
) from e
except httpx.HTTPError as e:
raise RegistryError(f"Property lookup failed: {e}") from e
except (ValidationError, ValueError) as e:
raise RegistryError(f"Property lookup failed: invalid response: {e}") from e

async def lookup_properties(
self, domains: list[str]
) -> dict[str, ResolvedProperty | None]:
"""Bulk resolve publisher domains to property info.

Automatically chunks requests exceeding 100 domains.

Args:
domains: List of publisher domains to resolve.

Returns:
Dict mapping each domain to its ResolvedProperty, or None if not found.

Raises:
RegistryError: On HTTP or parsing errors.
"""
if not domains:
return {}

chunks = [
domains[i : i + MAX_BULK_DOMAINS]
for i in range(0, len(domains), MAX_BULK_DOMAINS)
]

chunk_results = await asyncio.gather(
*[self._lookup_properties_chunk(chunk) for chunk in chunks],
return_exceptions=True,
)

merged: dict[str, ResolvedProperty | None] = {}
for result in chunk_results:
if isinstance(result, BaseException):
raise result
merged.update(result)
return merged

async def _lookup_properties_chunk(
self, domains: list[str]
) -> dict[str, ResolvedProperty | None]:
"""Resolve a single chunk of property domains (max 100)."""
client = await self._get_client()
try:
response = await client.post(
f"{self._base_url}/api/properties/resolve/bulk",
json={"domains": domains},
headers={"User-Agent": self._user_agent},
timeout=self._timeout,
)
if response.status_code != 200:
raise RegistryError(
f"Bulk property lookup failed: HTTP {response.status_code}",
status_code=response.status_code,
)
data = response.json()
results_raw = data.get("results", {})
results: dict[str, ResolvedProperty | None] = {d: None for d in domains}
for domain, prop_data in results_raw.items():
if prop_data is not None:
results[domain] = ResolvedProperty.model_validate(prop_data)
return results
except RegistryError:
raise
except httpx.TimeoutException as e:
raise RegistryError(
f"Bulk property lookup timed out after {self._timeout}s"
) from e
except httpx.HTTPError as e:
raise RegistryError(f"Bulk property lookup failed: {e}") from e
except (ValidationError, ValueError) as e:
raise RegistryError(
f"Bulk property lookup failed: invalid response: {e}"
) from e
11 changes: 10 additions & 1 deletion src/adcp/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,14 @@
# Re-export core types (not in generated, but part of public API)
# Note: We don't import TaskStatus here to avoid shadowing GeneratedTaskStatus
# Users should import TaskStatus from adcp.types.core directly if they need the core enum
from adcp.types.core import AgentConfig, Protocol, TaskResult, WebhookMetadata
from adcp.types.core import (
AgentConfig,
Protocol,
ResolvedBrand,
ResolvedProperty,
TaskResult,
WebhookMetadata,
)

# Re-export webhook payload type for webhook handling
from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload
Expand Down Expand Up @@ -637,6 +644,8 @@
# Core types
"AgentConfig",
"Protocol",
"ResolvedBrand",
"ResolvedProperty",
"TaskResult",
"WebhookMetadata",
# Webhook types
Expand Down
Loading