Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ dependencies = [
]

[project.optional-dependencies]
mcp = [
"mcp>=1.9.0 ; python_version >= '3.10'",
"pydantic-settings>=2.0",
]
mistralai = ["mistralai>=1.0.0"]
openai = ["openai>=1.1.0"]
nltk = ["nltk>=3.8.1,<4"]
Expand Down Expand Up @@ -148,4 +152,3 @@ filterwarnings = [
warn_unused_configs = true
ignore_missing_imports = true
exclude = ["env", "venv", ".venv"]

14 changes: 14 additions & 0 deletions redisvl/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from redisvl.mcp.config import MCPConfig, load_mcp_config
from redisvl.mcp.errors import MCPErrorCode, RedisVLMCPError, map_exception
from redisvl.mcp.server import RedisVLMCPServer
from redisvl.mcp.settings import MCPSettings

__all__ = [
"MCPConfig",
"MCPErrorCode",
"MCPSettings",
"RedisVLMCPError",
"RedisVLMCPServer",
"load_mcp_config",
"map_exception",
]
168 changes: 168 additions & 0 deletions redisvl/mcp/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import yaml
from pydantic import BaseModel, ConfigDict, Field, model_validator

from redisvl.schema.fields import BaseField
from redisvl.schema.schema import IndexInfo, IndexSchema

_ENV_PATTERN = re.compile(r"\$\{([^}:]+)(?::-([^}]*))?\}")


class MCPRuntimeConfig(BaseModel):
"""Runtime limits and validated field mappings for MCP requests."""

index_mode: str = "create_if_missing"
text_field_name: str
vector_field_name: str
default_embed_field: str
default_limit: int = 10
max_limit: int = 100
max_upsert_records: int = 64
skip_embedding_if_present: bool = True
startup_timeout_seconds: int = 30
request_timeout_seconds: int = 60
max_concurrency: int = 16

@model_validator(mode="after")
def _validate_limits(self) -> "MCPRuntimeConfig":
if self.index_mode not in {"validate_only", "create_if_missing"}:
raise ValueError(
"runtime.index_mode must be validate_only or create_if_missing"
)
if self.default_limit <= 0:
raise ValueError("runtime.default_limit must be greater than 0")
if self.max_limit < self.default_limit:
raise ValueError(
"runtime.max_limit must be greater than or equal to runtime.default_limit"
)
if self.max_upsert_records <= 0:
raise ValueError("runtime.max_upsert_records must be greater than 0")
if self.startup_timeout_seconds <= 0:
raise ValueError("runtime.startup_timeout_seconds must be greater than 0")
if self.request_timeout_seconds <= 0:
raise ValueError("runtime.request_timeout_seconds must be greater than 0")
if self.max_concurrency <= 0:
raise ValueError("runtime.max_concurrency must be greater than 0")
return self


class MCPVectorizerConfig(BaseModel):
"""Vectorizer constructor contract loaded from YAML."""

model_config = ConfigDict(populate_by_name=True, extra="allow")

class_name: str = Field(alias="class", min_length=1)
model: str = Field(..., min_length=1)

@property
def extra_kwargs(self) -> Dict[str, Any]:
"""Return vectorizer kwargs other than the normalized `class` and `model`."""
return dict(self.model_extra or {})

def to_init_kwargs(self) -> Dict[str, Any]:
"""Build kwargs suitable for directly instantiating the vectorizer."""
return {"model": self.model, **self.extra_kwargs}


class MCPConfig(BaseModel):
"""Validated MCP server configuration loaded from YAML."""

redis_url: str = Field(..., min_length=1)
index: IndexInfo
fields: Union[List[Dict[str, Any]], Dict[str, Dict[str, Any]]]
vectorizer: MCPVectorizerConfig
runtime: MCPRuntimeConfig

@model_validator(mode="after")
def _validate_runtime_mapping(self) -> "MCPConfig":
"""Ensure runtime field mappings point at explicit schema fields."""
schema = self.to_index_schema()
field_names = set(schema.field_names)

if self.runtime.text_field_name not in field_names:
raise ValueError(
f"runtime.text_field_name '{self.runtime.text_field_name}' not found in schema"
)

if self.runtime.default_embed_field not in field_names:
raise ValueError(
f"runtime.default_embed_field '{self.runtime.default_embed_field}' not found in schema"
)

vector_field = schema.fields.get(self.runtime.vector_field_name)
if vector_field is None:
raise ValueError(
f"runtime.vector_field_name '{self.runtime.vector_field_name}' not found in schema"
)
if vector_field.type != "vector":
raise ValueError(
f"runtime.vector_field_name '{self.runtime.vector_field_name}' must reference a vector field"
)

return self

def to_index_schema(self) -> IndexSchema:
"""Convert the MCP config schema fragment into a reusable `IndexSchema`."""
return IndexSchema.model_validate(
{
"index": self.index.model_dump(mode="python"),
"fields": self.fields,
}
)

@property
def vector_field(self) -> BaseField:
"""Return the configured vector field from the generated index schema."""
return self.to_index_schema().fields[self.runtime.vector_field_name]

@property
def vector_field_dims(self) -> Optional[int]:
"""Return the configured vector dimension when the field exposes one."""
attrs = self.vector_field.attrs
return getattr(attrs, "dims", None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated schema reconstruction on every property access

Low Severity

The vector_field and vector_field_dims properties each call to_index_schema(), which fully reconstructs and re-validates an IndexSchema via model_validate on every access. The model validator _validate_runtime_mapping also calls to_index_schema(). Any code path accessing vector_field_dims (e.g., _validate_vectorizer_dims during startup) triggers two redundant schema constructions. Caching the result or computing these values once during validation would avoid the repeated work.

Additional Locations (1)
Fix in Cursor Fix in Web



def _substitute_env(value: Any) -> Any:
"""Recursively resolve `${VAR}` and `${VAR:-default}` placeholders."""
if isinstance(value, dict):
return {key: _substitute_env(item) for key, item in value.items()}
if isinstance(value, list):
return [_substitute_env(item) for item in value]
if not isinstance(value, str):
return value

def replace(match: re.Match[str]) -> str:
name = match.group(1)
default = match.group(2)
env_value = os.environ.get(name)
if env_value is not None:
return env_value
if default is not None:
return default
# Fail fast here so startup never proceeds with partially-resolved config.
raise ValueError(f"Missing required environment variable: {name}")

return _ENV_PATTERN.sub(replace, value)


def load_mcp_config(path: str) -> MCPConfig:
"""Load, substitute, and validate the MCP YAML configuration file."""
config_path = Path(path).expanduser()
if not config_path.exists():
raise FileNotFoundError(f"MCP config file {path} does not exist")

try:
with config_path.open("r", encoding="utf-8") as file:
raw_data = yaml.safe_load(file)
except yaml.YAMLError as exc:
raise ValueError(f"Invalid MCP config YAML: {exc}") from exc

if not isinstance(raw_data, dict):
raise ValueError("Invalid MCP config YAML: root document must be a mapping")

substituted = _substitute_env(raw_data)
return MCPConfig.model_validate(substituted)
69 changes: 69 additions & 0 deletions redisvl/mcp/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
from enum import Enum
from typing import Any, Dict, Optional

from pydantic import ValidationError
from redis.exceptions import RedisError

from redisvl.exceptions import RedisSearchError


class MCPErrorCode(str, Enum):
"""Stable internal error codes exposed by the MCP framework."""

INVALID_REQUEST = "invalid_request"
DEPENDENCY_MISSING = "dependency_missing"
BACKEND_UNAVAILABLE = "backend_unavailable"
INTERNAL_ERROR = "internal_error"


class RedisVLMCPError(Exception):
"""Framework-facing exception carrying a stable MCP error contract."""

def __init__(
self,
message: str,
*,
code: MCPErrorCode,
retryable: bool,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(message)
self.code = code
self.retryable = retryable
self.metadata = metadata or {}


def map_exception(exc: Exception) -> RedisVLMCPError:
"""Map framework exceptions into deterministic MCP-facing exceptions."""
if isinstance(exc, RedisVLMCPError):
return exc

if isinstance(exc, (ValidationError, ValueError, FileNotFoundError)):
return RedisVLMCPError(
str(exc),
code=MCPErrorCode.INVALID_REQUEST,
retryable=False,
)

if isinstance(exc, ImportError):
return RedisVLMCPError(
str(exc),
code=MCPErrorCode.DEPENDENCY_MISSING,
retryable=False,
)

if isinstance(
exc, (TimeoutError, asyncio.TimeoutError, RedisSearchError, RedisError)
):
return RedisVLMCPError(
str(exc),
code=MCPErrorCode.BACKEND_UNAVAILABLE,
retryable=True,
)

return RedisVLMCPError(
str(exc),
code=MCPErrorCode.INTERNAL_ERROR,
retryable=False,
)
Loading
Loading