Skip to content

karenina.utils.mcp

mcp

Shared MCP utilities for Karenina.

This module provides adapter-agnostic MCP utilities using the core mcp package. For adapter-specific MCP functionality: - LangChain tools: karenina.adapters.langchain.mcp - Claude SDK tools: karenina.adapters.claude_tool.mcp

Functions

afetch_tool_descriptions async

afetch_tool_descriptions(
    mcp_urls_dict: dict[str, str],
    tool_filter: list[str] | None = None,
) -> dict[str, str]

Fetch tool descriptions from MCP servers using the core mcp package.

This function connects to MCP servers and retrieves the descriptions for all available tools. Useful for getting seed descriptions for GEPA optimization.

Parameters:

Name Type Description Default
mcp_urls_dict
dict[str, str]

Dictionary mapping server names to MCP server URLs

required
tool_filter
list[str] | None

Optional list of tool names to include

None

Returns:

Type Description
dict[str, str]

Dict mapping tool names to their descriptions

Example

descriptions = await fetch_tool_descriptions( ... {"biocontext": "https://mcp.biocontext.ai/mcp/"} ... ) print(descriptions) {'search_proteins': 'Search for protein information...', ...}

Source code in src/karenina/utils/mcp/tools.py
async def afetch_tool_descriptions(
    mcp_urls_dict: dict[str, str],
    tool_filter: list[str] | None = None,
) -> dict[str, str]:
    """Fetch tool descriptions from MCP servers using the core mcp package.

    This function connects to MCP servers and retrieves the descriptions
    for all available tools. Useful for getting seed descriptions for
    GEPA optimization.

    Args:
        mcp_urls_dict: Dictionary mapping server names to MCP server URLs
        tool_filter: Optional list of tool names to include

    Returns:
        Dict mapping tool names to their descriptions

    Example:
        >>> descriptions = await fetch_tool_descriptions(
        ...     {"biocontext": "https://mcp.biocontext.ai/mcp/"}
        ... )
        >>> print(descriptions)
        {'search_proteins': 'Search for protein information...', ...}
    """
    from mcp import ClientSession
    from mcp.client.streamable_http import streamablehttp_client

    descriptions: dict[str, str] = {}

    async with AsyncExitStack() as stack:
        for server_name, url in mcp_urls_dict.items():
            try:
                # Connect to MCP server
                http_transport = await asyncio.wait_for(
                    stack.enter_async_context(streamablehttp_client(url)),
                    timeout=30.0,
                )
                read_stream, write_stream, _ = http_transport

                session = await stack.enter_async_context(ClientSession(read_stream, write_stream))
                await session.initialize()

                # List tools from this server
                tools_response = await session.list_tools()
                mcp_tools = tools_response.tools

                # Extract descriptions
                for tool in mcp_tools:
                    tool_name = tool.name
                    tool_desc = tool.description or ""

                    # Apply filter if provided
                    if tool_filter is not None and tool_name not in tool_filter:
                        continue

                    descriptions[tool_name] = tool_desc

                logger.debug(f"Fetched {len(mcp_tools)} tools from MCP server '{server_name}'")

            except TimeoutError as e:
                raise McpTimeoutError(
                    f"MCP server '{server_name}' connection timed out after 30 seconds.",
                    server_name=server_name,
                    timeout_seconds=30,
                ) from e
            except Exception as e:
                logger.error(f"Failed to fetch tools from MCP server '{server_name}': {e}")
                raise

    logger.debug(f"Fetched descriptions for {len(descriptions)} total tools")
    return descriptions

apply_tool_description_overrides

apply_tool_description_overrides(
    tools: list[Any], overrides: dict[str, str]
) -> list[Any]

Apply description overrides to tools.

Used by GEPA optimization to test different tool descriptions.

Parameters:

Name Type Description Default
tools
list[Any]

List of tool objects (LangChain Tool or MCP Tool)

required
overrides
dict[str, str]

Dict mapping tool names to new descriptions

required

Returns:

Type Description
list[Any]

Modified tools list with updated descriptions

Source code in src/karenina/utils/mcp/tools.py
def apply_tool_description_overrides(
    tools: list[Any],
    overrides: dict[str, str],
) -> list[Any]:
    """Apply description overrides to tools.

    Used by GEPA optimization to test different tool descriptions.

    Args:
        tools: List of tool objects (LangChain Tool or MCP Tool)
        overrides: Dict mapping tool names to new descriptions

    Returns:
        Modified tools list with updated descriptions
    """
    for tool in tools:
        tool_name = getattr(tool, "name", None)
        if tool_name and tool_name in overrides:
            tool.description = overrides[tool_name]
    return tools

connect_all_mcp_servers async

connect_all_mcp_servers(
    exit_stack: AsyncExitStack,
    mcp_servers: dict[str, MCPServerConfig],
) -> dict[str, ClientSession]

Connect to all configured MCP servers.

Parameters:

Name Type Description Default
exit_stack
AsyncExitStack

AsyncExitStack for managing session lifecycles.

required
mcp_servers
dict[str, MCPServerConfig]

Dict mapping server names to their configurations.

required

Returns:

Type Description
dict[str, ClientSession]

Dict mapping server names to initialized ClientSession objects.

Raises:

Type Description
ValueError

If any server config is invalid.

ImportError

If mcp package is not installed.

Source code in src/karenina/utils/mcp/client.py
async def connect_all_mcp_servers(
    exit_stack: AsyncExitStack,
    mcp_servers: dict[str, MCPServerConfig],
) -> dict[str, ClientSession]:
    """Connect to all configured MCP servers.

    Args:
        exit_stack: AsyncExitStack for managing session lifecycles.
        mcp_servers: Dict mapping server names to their configurations.

    Returns:
        Dict mapping server names to initialized ClientSession objects.

    Raises:
        ValueError: If any server config is invalid.
        ImportError: If mcp package is not installed.
    """
    sessions: dict[str, ClientSession] = {}

    for name, config in mcp_servers.items():
        try:
            session = await connect_mcp_session(exit_stack, config)
            sessions[name] = session
            logger.info(f"Connected to MCP server '{name}'")
        except Exception as e:
            logger.error(f"Failed to connect to MCP server '{name}': {e}")
            raise

    return sessions

connect_mcp_session async

connect_mcp_session(
    exit_stack: AsyncExitStack, config: MCPServerConfig
) -> ClientSession

Connect to an MCP server via HTTP/SSE transport.

Creates a new MCP client session and registers it with the exit stack for automatic cleanup when the stack closes.

Parameters:

Name Type Description Default
exit_stack
AsyncExitStack

AsyncExitStack for managing session lifecycle.

required
config
MCPServerConfig

MCP server configuration. Must include 'url' for HTTP/SSE transport. Optional 'headers' dict for authentication.

required

Returns:

Type Description
ClientSession

Initialized ClientSession ready for tool calls.

Raises:

Type Description
ValueError

If config doesn't include required 'url' field.

ImportError

If mcp package is not installed.

Example

async with AsyncExitStack() as stack: ... config = {"url": "https://mcp.example.com/mcp", "type": "http"} ... session = await connect_mcp_session(stack, config) ... tools = await session.list_tools()

Source code in src/karenina/utils/mcp/client.py
async def connect_mcp_session(
    exit_stack: AsyncExitStack,
    config: MCPServerConfig,
) -> ClientSession:
    """Connect to an MCP server via HTTP/SSE transport.

    Creates a new MCP client session and registers it with the exit stack
    for automatic cleanup when the stack closes.

    Args:
        exit_stack: AsyncExitStack for managing session lifecycle.
        config: MCP server configuration. Must include 'url' for HTTP/SSE transport.
            Optional 'headers' dict for authentication.

    Returns:
        Initialized ClientSession ready for tool calls.

    Raises:
        ValueError: If config doesn't include required 'url' field.
        ImportError: If mcp package is not installed.

    Example:
        >>> async with AsyncExitStack() as stack:
        ...     config = {"url": "https://mcp.example.com/mcp", "type": "http"}
        ...     session = await connect_mcp_session(stack, config)
        ...     tools = await session.list_tools()
    """
    from mcp import ClientSession
    from mcp.client.streamable_http import streamablehttp_client

    # Extract URL from config
    # Cast to dict for TypedDict access
    config_dict: dict[str, object] = dict(config)
    url = config_dict.get("url")
    if not url or not isinstance(url, str):
        raise ValueError("MCP server config must include 'url' for HTTP/SSE transport")

    headers_raw = config_dict.get("headers", {})
    headers: dict[str, str] = headers_raw if isinstance(headers_raw, dict) else {}

    logger.debug(f"Connecting to MCP server at {url}")

    # Create HTTP transport and enter it into the exit stack
    http_transport = await exit_stack.enter_async_context(streamablehttp_client(url, headers=headers))

    read_stream, write_stream, _ = http_transport

    # Create and initialize session
    session = await exit_stack.enter_async_context(ClientSession(read_stream, write_stream))

    await session.initialize()

    logger.debug(f"Successfully connected to MCP server at {url}")

    return session

fetch_tool_descriptions

fetch_tool_descriptions(
    mcp_urls_dict: dict[str, str],
    tool_filter: list[str] | None = None,
) -> dict[str, str]

Synchronous wrapper for afetch_tool_descriptions.

Uses the same async handling pattern as other sync wrappers in karenina, supporting both the shared BlockingPortal (when available from parallel verification) and fallback to asyncio.run().

Parameters:

Name Type Description Default
mcp_urls_dict
dict[str, str]

Dictionary mapping server names to MCP server URLs

required
tool_filter
list[str] | None

Optional list of tool names to include

None

Returns:

Type Description
dict[str, str]

Dict mapping tool names to their descriptions

Source code in src/karenina/utils/mcp/tools.py
def fetch_tool_descriptions(
    mcp_urls_dict: dict[str, str],
    tool_filter: list[str] | None = None,
) -> dict[str, str]:
    """Synchronous wrapper for afetch_tool_descriptions.

    Uses the same async handling pattern as other sync wrappers in karenina,
    supporting both the shared BlockingPortal (when available from parallel
    verification) and fallback to asyncio.run().

    Args:
        mcp_urls_dict: Dictionary mapping server names to MCP server URLs
        tool_filter: Optional list of tool names to include

    Returns:
        Dict mapping tool names to their descriptions
    """
    # Try to use the shared portal if available (from parallel verification)
    try:
        from karenina.benchmark.verification.executor import get_async_portal

        portal = get_async_portal()
        if portal is not None:
            return portal.call(afetch_tool_descriptions, mcp_urls_dict, tool_filter)
    except ImportError:
        pass

    # Check if we're already in an async context
    try:
        asyncio.get_running_loop()

        # Use ThreadPoolExecutor to avoid nested event loop issues
        def run_in_thread() -> dict[str, str]:
            return asyncio.run(afetch_tool_descriptions(mcp_urls_dict, tool_filter))

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(run_in_thread)
            try:
                return future.result(timeout=45)
            except TimeoutError as e:
                raise McpTimeoutError(
                    "Fetch tool descriptions timed out after 45 seconds",
                    timeout_seconds=45,
                ) from e
    except RuntimeError:
        pass

    # Create new event loop and run
    return asyncio.run(afetch_tool_descriptions(mcp_urls_dict, tool_filter))

get_all_mcp_tools async

get_all_mcp_tools(
    sessions: dict[str, Any],
) -> list[tuple[str, Any, Any]]

Get all tools from connected MCP sessions.

Parameters:

Name Type Description Default
sessions
dict[str, Any]

Dict mapping server names to ClientSession objects.

required

Returns:

Type Description
list[tuple[str, Any, Any]]

List of tuples (server_name, session, mcp_tool) for each tool.

Source code in src/karenina/utils/mcp/client.py
async def get_all_mcp_tools(sessions: dict[str, Any]) -> list[tuple[str, Any, Any]]:
    """Get all tools from connected MCP sessions.

    Args:
        sessions: Dict mapping server names to ClientSession objects.

    Returns:
        List of tuples (server_name, session, mcp_tool) for each tool.
    """
    all_tools: list[tuple[str, Any, Any]] = []

    for server_name, session in sessions.items():
        try:
            tools_response = await session.list_tools()
            mcp_tools = tools_response.tools

            for tool in mcp_tools:
                all_tools.append((server_name, session, tool))

            logger.debug(f"MCP server '{server_name}' provides {len(mcp_tools)} tools: {[t.name for t in mcp_tools]}")

        except Exception as e:
            logger.error(f"Failed to list tools from MCP server '{server_name}': {e}")
            raise

    return all_tools