Skip to content

karenina.adapters.langchain_deep_agents

langchain_deep_agents

LangChain Deep Agents adapter for natively agentic evaluation.

This adapter provides AgentPort, LLMPort, and ParserPort implementations using LangChain Deep Agents (create_deep_agent). It enables provider-agnostic agentic evaluation with built-in planning, context management, and subagent orchestration.

Requires: pip install deepagents langchain-mcp-adapters

Adapter classes
  • DeepAgentsAgentAdapter: Agent loops via create_deep_agent with MCP support
  • DeepAgentsLLMAdapter: Simple LLM invocation via single-turn agent
  • DeepAgentsParserAdapter: Structured output parsing
Utilities
  • DeepAgentsMessageConverter: Convert between unified Message and LangGraph types
  • check_deep_agents_available: Check if deepagents is installed
  • convert_mcp_to_tools: Convert MCPServerConfig to LangChain tools
  • extract_deep_agents_usage: Extract UsageMetadata from agent results
  • deep_agents_messages_to_raw_trace: Format messages as raw trace string

Classes

DeepAgentsAgentAdapter

Agent adapter using LangChain Deep Agents' create_deep_agent.

This adapter implements the AgentPort Protocol for agent execution with built-in planning tools, filesystem operations, subagent delegation, and context management. Uses create_deep_agent() which returns a compiled LangGraph graph.

The adapter handles: - Message conversion from unified Message to prompt string - Model initialization via init_chat_model - Agent creation and invocation via LangGraph - Dual trace output (raw_trace string and trace_messages list) - Usage metadata extraction from AIMessage response_metadata - Recursion limit detection from LangGraph state

Example

config = ModelConfig( ... id="test", ... model_name="claude-sonnet-4-20250514", ... model_provider="anthropic", ... interface="langchain_deep_agents", ... ) adapter = DeepAgentsAgentAdapter(config) result = await adapter.arun( ... messages=[Message.user("What files are in /tmp?")], ... config=AgentConfig(max_turns=10), ... ) print(result.final_response)

Source code in src/karenina/adapters/langchain_deep_agents/agent.py
class DeepAgentsAgentAdapter:
    """Agent adapter using LangChain Deep Agents' create_deep_agent.

    This adapter implements the AgentPort Protocol for agent execution with
    built-in planning tools, filesystem operations, subagent delegation,
    and context management. Uses create_deep_agent() which returns a
    compiled LangGraph graph.

    The adapter handles:
    - Message conversion from unified Message to prompt string
    - Model initialization via init_chat_model
    - Agent creation and invocation via LangGraph
    - Dual trace output (raw_trace string and trace_messages list)
    - Usage metadata extraction from AIMessage response_metadata
    - Recursion limit detection from LangGraph state

    Example:
        >>> config = ModelConfig(
        ...     id="test",
        ...     model_name="claude-sonnet-4-20250514",
        ...     model_provider="anthropic",
        ...     interface="langchain_deep_agents",
        ... )
        >>> adapter = DeepAgentsAgentAdapter(config)
        >>> result = await adapter.arun(
        ...     messages=[Message.user("What files are in /tmp?")],
        ...     config=AgentConfig(max_turns=10),
        ... )
        >>> print(result.final_response)
    """

    def __init__(self, model_config: ModelConfig) -> None:
        """Initialize the Deep Agents adapter.

        Args:
            model_config: Configuration specifying model, provider, and interface.
        """
        self._config = model_config
        self._converter = DeepAgentsMessageConverter()

    def _extract_final_response(self, lc_messages: list[Any]) -> str:
        """Extract final text response from the last AIMessage.

        Args:
            lc_messages: List of LangGraph BaseMessage objects.

        Returns:
            The final text response string.
        """
        from langchain_core.messages import AIMessage

        for msg in reversed(lc_messages):
            if isinstance(msg, AIMessage):
                if isinstance(msg.content, str) and msg.content:
                    return msg.content
                if isinstance(msg.content, list):
                    text_parts = []
                    for block in msg.content:
                        if isinstance(block, str):
                            text_parts.append(block)
                        elif isinstance(block, dict) and block.get("type") == "text":
                            text_parts.append(block["text"])
                    if text_parts:
                        return "\n".join(text_parts)

        return "[No final response extracted]"

    def _count_turns(self, lc_messages: list[Any]) -> int:
        """Count the number of agent turns (AIMessage instances).

        Args:
            lc_messages: List of LangGraph BaseMessage objects.

        Returns:
            Number of AIMessage instances in the conversation.
        """
        from langchain_core.messages import AIMessage

        return sum(1 for msg in lc_messages if isinstance(msg, AIMessage))

    async def arun(
        self,
        messages: list[Message],
        tools: list[Tool] | None = None,  # noqa: ARG002 - required by AgentPort protocol
        mcp_servers: dict[str, MCPServerConfig] | None = None,  # noqa: ARG002 - required by AgentPort protocol
        config: AgentConfig | None = None,
    ) -> AgentResult:
        """Execute an agent loop with optional tools and MCP servers.

        Args:
            messages: Initial conversation messages.
            tools: Optional list of Tool definitions the agent can invoke.
            mcp_servers: Optional dict of MCP server configurations.
            config: Optional AgentConfig for execution parameters.

        Returns:
            AgentResult with final response, traces, usage, and metadata.

        Raises:
            AgentExecutionError: If the agent fails during execution.
            AgentTimeoutError: If execution exceeds the timeout.
            AgentResponseError: If the response is malformed or invalid.
        """
        global _create_deep_agent  # noqa: PLW0603
        if _create_deep_agent is None:
            from deepagents import create_deep_agent as _cda

            _create_deep_agent = _cda

        config = config or AgentConfig()

        # Convert messages to prompt string and extract system prompt
        prompt_string = self._converter.to_prompt_string(messages)
        system_prompt = self._converter.extract_system_prompt(messages)

        # Use config system_prompt as fallback
        if not system_prompt and config.system_prompt:
            system_prompt = config.system_prompt
        elif not system_prompt and self._config.system_prompt:
            system_prompt = self._config.system_prompt

        # Initialize model
        chat_model = create_chat_model(self._config)

        # Build agent kwargs
        agent_kwargs: dict[str, Any] = {"model": chat_model}
        if system_prompt:
            agent_kwargs["system_prompt"] = system_prompt

        # Configure backend for real filesystem access when workspace is available
        workspace_path = config.workspace_path
        if workspace_path:
            from deepagents.backends import FilesystemBackend

            agent_kwargs["backend"] = FilesystemBackend(root_dir=str(workspace_path))
            logger.info("Using FilesystemBackend with root_dir=%s", workspace_path)
        else:
            # Default: use FilesystemBackend rooted at cwd for real filesystem access.
            # StateBackend (virtual/in-memory) is NOT suitable for benchmarking because
            # the agent cannot see real files on disk.
            from deepagents.backends import FilesystemBackend

            agent_kwargs["backend"] = FilesystemBackend()
            logger.info("Using FilesystemBackend with default root (cwd)")

        # Pass through extra config to create_deep_agent
        if config.extra:
            for key, value in config.extra.items():
                if key not in ("model", "system_prompt", "backend"):
                    agent_kwargs[key] = value

        # Create the agent
        agent = _create_deep_agent(**agent_kwargs)

        # Build invocation input
        invoke_input: dict[str, Any] = {
            "messages": [{"role": "user", "content": prompt_string}],
        }

        # LangGraph config for recursion limit
        # Each tool call + response = 2 steps, so double max_turns
        langgraph_config: dict[str, Any] = {
            "recursion_limit": config.max_turns * 2,
        }

        # Execute agent
        result: dict[str, Any] = {}
        limit_reached = False

        async def execute_agent() -> None:
            nonlocal result, limit_reached

            result = await agent.ainvoke(invoke_input, config=langgraph_config)

            # Check if limit was reached via state
            if result.get("is_last_step", False):
                limit_reached = True

        try:
            if config.timeout:
                await asyncio.wait_for(execute_agent(), timeout=config.timeout)
            else:
                await execute_agent()

        except TimeoutError as e:
            raise AgentTimeoutError(f"Agent execution timed out after {config.timeout}s") from e
        except Exception as e:
            mapped_error, was_limit = wrap_deep_agents_error(e)
            if was_limit:
                limit_reached = True
                logger.warning("Agent hit turn limit: %s", e)
            else:
                raise mapped_error from e

        # Extract messages from result
        lc_messages: list[Any] = result.get("messages", [])

        if not lc_messages and not limit_reached:
            raise AgentResponseError("No messages received from Deep Agents")

        # If limit was reached but no messages, return a partial result
        if not lc_messages and limit_reached:
            return AgentResult(
                final_response="[Agent hit recursion limit before producing a response]",
                raw_trace="[Note: Recursion limit reached, no response produced]",
                trace_messages=[],
                usage=UsageMetadata(model=self._config.model_name),
                turns=0,
                limit_reached=True,
                session_id=None,
                actual_model=self._config.model_name,
            )

        # Build raw_trace (legacy string format)
        raw_trace = deep_agents_messages_to_raw_trace(lc_messages)
        if limit_reached:
            raw_trace += "\n\n[Note: Recursion limit reached, partial response shown]"

        # Build trace_messages (structured format)
        trace_messages = self._converter.from_provider(lc_messages)

        # Extract final response
        final_response = self._extract_final_response(lc_messages)

        # Extract usage
        usage = extract_deep_agents_usage(lc_messages, model=self._config.model_name)

        # Count turns
        turns = self._count_turns(lc_messages)

        # Extract actual model
        actual_model = extract_actual_model(lc_messages) or self._config.model_name

        return AgentResult(
            final_response=final_response,
            raw_trace=raw_trace,
            trace_messages=trace_messages,
            usage=usage,
            turns=turns,
            limit_reached=limit_reached,
            session_id=None,
            actual_model=actual_model,
        )

    def run(
        self,
        messages: list[Message],
        tools: list[Tool] | None = None,
        mcp_servers: dict[str, MCPServerConfig] | None = None,
        config: AgentConfig | None = None,
    ) -> AgentResult:
        """Synchronous wrapper for arun().

        Args:
            messages: Initial conversation messages.
            tools: Optional list of Tool definitions.
            mcp_servers: Optional MCP server configurations.
            config: Optional AgentConfig for execution parameters.

        Returns:
            AgentResult from the agent execution.

        Raises:
            AgentExecutionError: If the agent fails during execution.
            AgentTimeoutError: If execution exceeds the timeout.
            AgentResponseError: If the response is malformed or invalid.
        """
        from karenina.benchmark.verification.executor import get_async_portal

        portal = get_async_portal()

        if portal is not None:
            return portal.call(self.arun, messages, tools, mcp_servers, config)

        # No portal: check if we're in an async context
        try:
            asyncio.get_running_loop()
            # We're in an async context: use ThreadPoolExecutor

            def run_in_thread() -> AgentResult:
                return asyncio.run(self.arun(messages, tools, mcp_servers, config))

            timeout = config.timeout if config and config.timeout else 600
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(run_in_thread)
                return future.result(timeout=timeout)

        except RuntimeError:
            # No event loop running, safe to use asyncio.run
            return asyncio.run(self.arun(messages, tools, mcp_servers, config))

    async def aclose(self) -> None:
        """Close underlying resources.

        Deep Agents manages its own cleanup via LangGraph's compiled graph,
        so this is a no-op. Provided for interface consistency with other
        adapters that do require cleanup.
        """
Functions
__init__
__init__(model_config: ModelConfig) -> None

Parameters:

Name Type Description Default
model_config ModelConfig

Configuration specifying model, provider, and interface.

required
Source code in src/karenina/adapters/langchain_deep_agents/agent.py
def __init__(self, model_config: ModelConfig) -> None:
    """Initialize the Deep Agents adapter.

    Args:
        model_config: Configuration specifying model, provider, and interface.
    """
    self._config = model_config
    self._converter = DeepAgentsMessageConverter()
aclose async
aclose() -> None

Close underlying resources.

Deep Agents manages its own cleanup via LangGraph's compiled graph, so this is a no-op. Provided for interface consistency with other adapters that do require cleanup.

Source code in src/karenina/adapters/langchain_deep_agents/agent.py
async def aclose(self) -> None:
    """Close underlying resources.

    Deep Agents manages its own cleanup via LangGraph's compiled graph,
    so this is a no-op. Provided for interface consistency with other
    adapters that do require cleanup.
    """
arun async
arun(
    messages: list[Message],
    tools: list[Tool] | None = None,
    mcp_servers: dict[str, MCPServerConfig] | None = None,
    config: AgentConfig | None = None,
) -> AgentResult

Execute an agent loop with optional tools and MCP servers.

Parameters:

Name Type Description Default
messages list[Message]

Initial conversation messages.

required
tools list[Tool] | None

Optional list of Tool definitions the agent can invoke.

None
mcp_servers dict[str, MCPServerConfig] | None

Optional dict of MCP server configurations.

None
config AgentConfig | None

Optional AgentConfig for execution parameters.

None

Returns:

Type Description
AgentResult

AgentResult with final response, traces, usage, and metadata.

Raises:

Type Description
AgentExecutionError

If the agent fails during execution.

AgentTimeoutError

If execution exceeds the timeout.

AgentResponseError

If the response is malformed or invalid.

Source code in src/karenina/adapters/langchain_deep_agents/agent.py
async def arun(
    self,
    messages: list[Message],
    tools: list[Tool] | None = None,  # noqa: ARG002 - required by AgentPort protocol
    mcp_servers: dict[str, MCPServerConfig] | None = None,  # noqa: ARG002 - required by AgentPort protocol
    config: AgentConfig | None = None,
) -> AgentResult:
    """Execute an agent loop with optional tools and MCP servers.

    Args:
        messages: Initial conversation messages.
        tools: Optional list of Tool definitions the agent can invoke.
        mcp_servers: Optional dict of MCP server configurations.
        config: Optional AgentConfig for execution parameters.

    Returns:
        AgentResult with final response, traces, usage, and metadata.

    Raises:
        AgentExecutionError: If the agent fails during execution.
        AgentTimeoutError: If execution exceeds the timeout.
        AgentResponseError: If the response is malformed or invalid.
    """
    global _create_deep_agent  # noqa: PLW0603
    if _create_deep_agent is None:
        from deepagents import create_deep_agent as _cda

        _create_deep_agent = _cda

    config = config or AgentConfig()

    # Convert messages to prompt string and extract system prompt
    prompt_string = self._converter.to_prompt_string(messages)
    system_prompt = self._converter.extract_system_prompt(messages)

    # Use config system_prompt as fallback
    if not system_prompt and config.system_prompt:
        system_prompt = config.system_prompt
    elif not system_prompt and self._config.system_prompt:
        system_prompt = self._config.system_prompt

    # Initialize model
    chat_model = create_chat_model(self._config)

    # Build agent kwargs
    agent_kwargs: dict[str, Any] = {"model": chat_model}
    if system_prompt:
        agent_kwargs["system_prompt"] = system_prompt

    # Configure backend for real filesystem access when workspace is available
    workspace_path = config.workspace_path
    if workspace_path:
        from deepagents.backends import FilesystemBackend

        agent_kwargs["backend"] = FilesystemBackend(root_dir=str(workspace_path))
        logger.info("Using FilesystemBackend with root_dir=%s", workspace_path)
    else:
        # Default: use FilesystemBackend rooted at cwd for real filesystem access.
        # StateBackend (virtual/in-memory) is NOT suitable for benchmarking because
        # the agent cannot see real files on disk.
        from deepagents.backends import FilesystemBackend

        agent_kwargs["backend"] = FilesystemBackend()
        logger.info("Using FilesystemBackend with default root (cwd)")

    # Pass through extra config to create_deep_agent
    if config.extra:
        for key, value in config.extra.items():
            if key not in ("model", "system_prompt", "backend"):
                agent_kwargs[key] = value

    # Create the agent
    agent = _create_deep_agent(**agent_kwargs)

    # Build invocation input
    invoke_input: dict[str, Any] = {
        "messages": [{"role": "user", "content": prompt_string}],
    }

    # LangGraph config for recursion limit
    # Each tool call + response = 2 steps, so double max_turns
    langgraph_config: dict[str, Any] = {
        "recursion_limit": config.max_turns * 2,
    }

    # Execute agent
    result: dict[str, Any] = {}
    limit_reached = False

    async def execute_agent() -> None:
        nonlocal result, limit_reached

        result = await agent.ainvoke(invoke_input, config=langgraph_config)

        # Check if limit was reached via state
        if result.get("is_last_step", False):
            limit_reached = True

    try:
        if config.timeout:
            await asyncio.wait_for(execute_agent(), timeout=config.timeout)
        else:
            await execute_agent()

    except TimeoutError as e:
        raise AgentTimeoutError(f"Agent execution timed out after {config.timeout}s") from e
    except Exception as e:
        mapped_error, was_limit = wrap_deep_agents_error(e)
        if was_limit:
            limit_reached = True
            logger.warning("Agent hit turn limit: %s", e)
        else:
            raise mapped_error from e

    # Extract messages from result
    lc_messages: list[Any] = result.get("messages", [])

    if not lc_messages and not limit_reached:
        raise AgentResponseError("No messages received from Deep Agents")

    # If limit was reached but no messages, return a partial result
    if not lc_messages and limit_reached:
        return AgentResult(
            final_response="[Agent hit recursion limit before producing a response]",
            raw_trace="[Note: Recursion limit reached, no response produced]",
            trace_messages=[],
            usage=UsageMetadata(model=self._config.model_name),
            turns=0,
            limit_reached=True,
            session_id=None,
            actual_model=self._config.model_name,
        )

    # Build raw_trace (legacy string format)
    raw_trace = deep_agents_messages_to_raw_trace(lc_messages)
    if limit_reached:
        raw_trace += "\n\n[Note: Recursion limit reached, partial response shown]"

    # Build trace_messages (structured format)
    trace_messages = self._converter.from_provider(lc_messages)

    # Extract final response
    final_response = self._extract_final_response(lc_messages)

    # Extract usage
    usage = extract_deep_agents_usage(lc_messages, model=self._config.model_name)

    # Count turns
    turns = self._count_turns(lc_messages)

    # Extract actual model
    actual_model = extract_actual_model(lc_messages) or self._config.model_name

    return AgentResult(
        final_response=final_response,
        raw_trace=raw_trace,
        trace_messages=trace_messages,
        usage=usage,
        turns=turns,
        limit_reached=limit_reached,
        session_id=None,
        actual_model=actual_model,
    )
run
run(
    messages: list[Message],
    tools: list[Tool] | None = None,
    mcp_servers: dict[str, MCPServerConfig] | None = None,
    config: AgentConfig | None = None,
) -> AgentResult

Synchronous wrapper for arun().

Parameters:

Name Type Description Default
messages list[Message]

Initial conversation messages.

required
tools list[Tool] | None

Optional list of Tool definitions.

None
mcp_servers dict[str, MCPServerConfig] | None

Optional MCP server configurations.

None
config AgentConfig | None

Optional AgentConfig for execution parameters.

None

Returns:

Type Description
AgentResult

AgentResult from the agent execution.

Raises:

Type Description
AgentExecutionError

If the agent fails during execution.

AgentTimeoutError

If execution exceeds the timeout.

AgentResponseError

If the response is malformed or invalid.

Source code in src/karenina/adapters/langchain_deep_agents/agent.py
def run(
    self,
    messages: list[Message],
    tools: list[Tool] | None = None,
    mcp_servers: dict[str, MCPServerConfig] | None = None,
    config: AgentConfig | None = None,
) -> AgentResult:
    """Synchronous wrapper for arun().

    Args:
        messages: Initial conversation messages.
        tools: Optional list of Tool definitions.
        mcp_servers: Optional MCP server configurations.
        config: Optional AgentConfig for execution parameters.

    Returns:
        AgentResult from the agent execution.

    Raises:
        AgentExecutionError: If the agent fails during execution.
        AgentTimeoutError: If execution exceeds the timeout.
        AgentResponseError: If the response is malformed or invalid.
    """
    from karenina.benchmark.verification.executor import get_async_portal

    portal = get_async_portal()

    if portal is not None:
        return portal.call(self.arun, messages, tools, mcp_servers, config)

    # No portal: check if we're in an async context
    try:
        asyncio.get_running_loop()
        # We're in an async context: use ThreadPoolExecutor

        def run_in_thread() -> AgentResult:
            return asyncio.run(self.arun(messages, tools, mcp_servers, config))

        timeout = config.timeout if config and config.timeout else 600
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(run_in_thread)
            return future.result(timeout=timeout)

    except RuntimeError:
        # No event loop running, safe to use asyncio.run
        return asyncio.run(self.arun(messages, tools, mcp_servers, config))

DeepAgentsLLMAdapter

LLM adapter using LangChain's init_chat_model for single-turn calls.

This adapter implements the LLMPort Protocol for simple LLM invocation without agent loops. Uses the LangChain model directly for efficiency.

Example

config = ModelConfig( ... id="test", ... model_name="claude-sonnet-4-20250514", ... model_provider="anthropic", ... interface="langchain_deep_agents", ... ) adapter = DeepAgentsLLMAdapter(config) response = await adapter.ainvoke([Message.user("Hello!")]) print(response.content)

Source code in src/karenina/adapters/langchain_deep_agents/llm.py
class DeepAgentsLLMAdapter:
    """LLM adapter using LangChain's init_chat_model for single-turn calls.

    This adapter implements the LLMPort Protocol for simple LLM invocation
    without agent loops. Uses the LangChain model directly for efficiency.

    Example:
        >>> config = ModelConfig(
        ...     id="test",
        ...     model_name="claude-sonnet-4-20250514",
        ...     model_provider="anthropic",
        ...     interface="langchain_deep_agents",
        ... )
        >>> adapter = DeepAgentsLLMAdapter(config)
        >>> response = await adapter.ainvoke([Message.user("Hello!")])
        >>> print(response.content)
    """

    def __init__(
        self,
        model_config: ModelConfig,
        *,
        _structured_schema: type[BaseModel] | None = None,
    ) -> None:
        """Initialize the Deep Agents LLM adapter.

        Args:
            model_config: Configuration specifying model, provider, and interface.
            _structured_schema: Internal; schema for structured output mode.
        """
        self._config = model_config
        self._converter = DeepAgentsMessageConverter()
        self._structured_schema = _structured_schema

    @property
    def capabilities(self) -> PortCapabilities:
        """Declare adapter capabilities.

        Returns:
            PortCapabilities with system_prompt=True and structured_output=True.
        """
        return PortCapabilities(
            supports_system_prompt=True,
            supports_structured_output=True,
        )

    async def ainvoke(self, messages: list[Message]) -> LLMResponse:
        """Invoke the LLM asynchronously.

        Converts karenina Messages to LangChain format, invokes the model,
        and converts the response back.

        Args:
            messages: List of messages forming the conversation.

        Returns:
            LLMResponse containing the generated content and usage metadata.
        """
        from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

        # Build LangChain message list
        lc_messages: list[Any] = []
        for msg in messages:
            text = msg.text or ""
            if msg.role.value == "system":
                lc_messages.append(SystemMessage(content=text))
            elif msg.role.value == "user":
                lc_messages.append(HumanMessage(content=text))
            elif msg.role.value == "assistant":
                lc_messages.append(AIMessage(content=text))

        # Create model
        chat_model = create_chat_model(self._config)

        # Apply structured output if configured
        if self._structured_schema is not None:
            chat_model = chat_model.with_structured_output(self._structured_schema)

        # Invoke
        response = await chat_model.ainvoke(lc_messages)

        # Extract content
        if self._structured_schema is not None:
            # Structured output returns a Pydantic model or dict
            if isinstance(response, BaseModel):
                content = response.model_dump_json()
            elif isinstance(response, dict):
                import json

                content = json.dumps(response)
            else:
                content = str(response)
        elif isinstance(response, AIMessage):
            content = response.content if isinstance(response.content, str) else str(response.content)
        else:
            content = str(response)

        # Extract usage
        usage = UsageMetadata(model=self._config.model_name)
        if isinstance(response, AIMessage):
            usage_meta = getattr(response, "usage_metadata", None)
            if usage_meta and isinstance(usage_meta, dict):
                usage = UsageMetadata(
                    input_tokens=usage_meta.get("input_tokens", 0),
                    output_tokens=usage_meta.get("output_tokens", 0),
                    total_tokens=usage_meta.get("input_tokens", 0) + usage_meta.get("output_tokens", 0),
                    model=self._config.model_name,
                )

        return LLMResponse(content=content, usage=usage, raw=response)

    def invoke(self, messages: list[Message]) -> LLMResponse:
        """Invoke the LLM synchronously.

        Args:
            messages: List of messages forming the conversation.

        Returns:
            LLMResponse containing the generated content and usage metadata.
        """
        from karenina.benchmark.verification.executor import get_async_portal

        portal = get_async_portal()
        if portal is not None:
            return portal.call(self.ainvoke, messages)

        try:
            asyncio.get_running_loop()

            def run_in_thread() -> LLMResponse:
                return asyncio.run(self.ainvoke(messages))

            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(run_in_thread)
                return future.result(timeout=600)

        except RuntimeError:
            return asyncio.run(self.ainvoke(messages))

    def with_structured_output(
        self,
        schema: type[BaseModel],
        *,
        max_retries: int | None = None,  # noqa: ARG002
    ) -> DeepAgentsLLMAdapter:
        """Return a new adapter configured for structured output.

        Args:
            schema: A Pydantic model class defining the output structure.
            max_retries: Ignored (LangChain handles retries internally).

        Returns:
            A new DeepAgentsLLMAdapter configured with the schema.
        """
        return DeepAgentsLLMAdapter(
            self._config,
            _structured_schema=schema,
        )
Attributes
capabilities property
capabilities: PortCapabilities

Declare adapter capabilities.

Returns:

Type Description
PortCapabilities

PortCapabilities with system_prompt=True and structured_output=True.

Functions
__init__
__init__(
    model_config: ModelConfig,
    *,
    _structured_schema: type[BaseModel] | None = None,
) -> None

Parameters:

Name Type Description Default
model_config ModelConfig

Configuration specifying model, provider, and interface.

required
_structured_schema type[BaseModel] | None

Internal; schema for structured output mode.

None
Source code in src/karenina/adapters/langchain_deep_agents/llm.py
def __init__(
    self,
    model_config: ModelConfig,
    *,
    _structured_schema: type[BaseModel] | None = None,
) -> None:
    """Initialize the Deep Agents LLM adapter.

    Args:
        model_config: Configuration specifying model, provider, and interface.
        _structured_schema: Internal; schema for structured output mode.
    """
    self._config = model_config
    self._converter = DeepAgentsMessageConverter()
    self._structured_schema = _structured_schema
ainvoke async
ainvoke(messages: list[Message]) -> LLMResponse

Invoke the LLM asynchronously.

Converts karenina Messages to LangChain format, invokes the model, and converts the response back.

Parameters:

Name Type Description Default
messages list[Message]

List of messages forming the conversation.

required

Returns:

Type Description
LLMResponse

LLMResponse containing the generated content and usage metadata.

Source code in src/karenina/adapters/langchain_deep_agents/llm.py
async def ainvoke(self, messages: list[Message]) -> LLMResponse:
    """Invoke the LLM asynchronously.

    Converts karenina Messages to LangChain format, invokes the model,
    and converts the response back.

    Args:
        messages: List of messages forming the conversation.

    Returns:
        LLMResponse containing the generated content and usage metadata.
    """
    from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

    # Build LangChain message list
    lc_messages: list[Any] = []
    for msg in messages:
        text = msg.text or ""
        if msg.role.value == "system":
            lc_messages.append(SystemMessage(content=text))
        elif msg.role.value == "user":
            lc_messages.append(HumanMessage(content=text))
        elif msg.role.value == "assistant":
            lc_messages.append(AIMessage(content=text))

    # Create model
    chat_model = create_chat_model(self._config)

    # Apply structured output if configured
    if self._structured_schema is not None:
        chat_model = chat_model.with_structured_output(self._structured_schema)

    # Invoke
    response = await chat_model.ainvoke(lc_messages)

    # Extract content
    if self._structured_schema is not None:
        # Structured output returns a Pydantic model or dict
        if isinstance(response, BaseModel):
            content = response.model_dump_json()
        elif isinstance(response, dict):
            import json

            content = json.dumps(response)
        else:
            content = str(response)
    elif isinstance(response, AIMessage):
        content = response.content if isinstance(response.content, str) else str(response.content)
    else:
        content = str(response)

    # Extract usage
    usage = UsageMetadata(model=self._config.model_name)
    if isinstance(response, AIMessage):
        usage_meta = getattr(response, "usage_metadata", None)
        if usage_meta and isinstance(usage_meta, dict):
            usage = UsageMetadata(
                input_tokens=usage_meta.get("input_tokens", 0),
                output_tokens=usage_meta.get("output_tokens", 0),
                total_tokens=usage_meta.get("input_tokens", 0) + usage_meta.get("output_tokens", 0),
                model=self._config.model_name,
            )

    return LLMResponse(content=content, usage=usage, raw=response)
invoke
invoke(messages: list[Message]) -> LLMResponse

Invoke the LLM synchronously.

Parameters:

Name Type Description Default
messages list[Message]

List of messages forming the conversation.

required

Returns:

Type Description
LLMResponse

LLMResponse containing the generated content and usage metadata.

Source code in src/karenina/adapters/langchain_deep_agents/llm.py
def invoke(self, messages: list[Message]) -> LLMResponse:
    """Invoke the LLM synchronously.

    Args:
        messages: List of messages forming the conversation.

    Returns:
        LLMResponse containing the generated content and usage metadata.
    """
    from karenina.benchmark.verification.executor import get_async_portal

    portal = get_async_portal()
    if portal is not None:
        return portal.call(self.ainvoke, messages)

    try:
        asyncio.get_running_loop()

        def run_in_thread() -> LLMResponse:
            return asyncio.run(self.ainvoke(messages))

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(run_in_thread)
            return future.result(timeout=600)

    except RuntimeError:
        return asyncio.run(self.ainvoke(messages))
with_structured_output
with_structured_output(
    schema: type[BaseModel],
    *,
    max_retries: int | None = None,
) -> DeepAgentsLLMAdapter

Return a new adapter configured for structured output.

Parameters:

Name Type Description Default
schema type[BaseModel]

A Pydantic model class defining the output structure.

required
max_retries int | None

Ignored (LangChain handles retries internally).

None

Returns:

Type Description
DeepAgentsLLMAdapter

A new DeepAgentsLLMAdapter configured with the schema.

Source code in src/karenina/adapters/langchain_deep_agents/llm.py
def with_structured_output(
    self,
    schema: type[BaseModel],
    *,
    max_retries: int | None = None,  # noqa: ARG002
) -> DeepAgentsLLMAdapter:
    """Return a new adapter configured for structured output.

    Args:
        schema: A Pydantic model class defining the output structure.
        max_retries: Ignored (LangChain handles retries internally).

    Returns:
        A new DeepAgentsLLMAdapter configured with the schema.
    """
    return DeepAgentsLLMAdapter(
        self._config,
        _structured_schema=schema,
    )

DeepAgentsMessageConverter

Convert between karenina's unified Message and LangGraph message types.

Deep Agents accepts messages as dicts with role/content keys for invocation, and returns LangGraph BaseMessage subclasses in results.

Source code in src/karenina/adapters/langchain_deep_agents/messages.py
class DeepAgentsMessageConverter:
    """Convert between karenina's unified Message and LangGraph message types.

    Deep Agents accepts messages as dicts with role/content keys for invocation,
    and returns LangGraph BaseMessage subclasses in results.
    """

    def to_prompt_string(self, messages: list[Message]) -> str:
        """Convert user/assistant messages to a prompt string.

        System messages are excluded (use extract_system_prompt instead).

        Args:
            messages: List of karenina Message objects.

        Returns:
            Concatenated prompt string from non-system messages.
        """
        parts = []
        for msg in messages:
            if msg.role == Role.SYSTEM:
                continue
            text = msg.text
            if text:
                parts.append(text)
        return "\n\n".join(parts)

    def extract_system_prompt(self, messages: list[Message]) -> str | None:
        """Extract system prompt from messages.

        Args:
            messages: List of karenina Message objects.

        Returns:
            Combined system prompt text, or None if no system messages.
        """
        system_parts = []
        for msg in messages:
            if msg.role == Role.SYSTEM:
                text = msg.text
                if text:
                    system_parts.append(text)
        return "\n\n".join(system_parts) if system_parts else None

    def to_langchain_messages(self, messages: list[Message]) -> list[dict[str, str]]:
        """Convert karenina messages to LangGraph-compatible dicts.

        Args:
            messages: List of karenina Message objects.

        Returns:
            List of message dicts with role and content keys.
        """
        result = []
        for msg in messages:
            if msg.role == Role.SYSTEM:
                continue
            role_map = {
                Role.USER: "user",
                Role.ASSISTANT: "assistant",
                Role.TOOL: "tool",
            }
            role = role_map.get(msg.role, "user")
            result.append({"role": role, "content": msg.text or ""})
        return result

    def from_provider(self, lc_messages: list[Any]) -> list[Message]:
        """Convert LangGraph BaseMessage list to karenina Messages.

        Args:
            lc_messages: List of LangGraph message objects.

        Returns:
            List of karenina Message objects.
        """
        from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage

        result: list[Message] = []

        for msg in lc_messages:
            if isinstance(msg, SystemMessage):
                result.append(Message.system(msg.content if isinstance(msg.content, str) else str(msg.content)))

            elif isinstance(msg, HumanMessage):
                result.append(Message.user(msg.content if isinstance(msg.content, str) else str(msg.content)))

            elif isinstance(msg, AIMessage):
                result.append(self._convert_ai_message(msg))

            elif isinstance(msg, ToolMessage):
                content = msg.content if isinstance(msg.content, str) else str(msg.content)
                tool_call_id = getattr(msg, "tool_call_id", "") or ""
                result.append(
                    Message(
                        role=Role.TOOL,
                        content=[ToolResultContent(tool_use_id=tool_call_id, content=content)],
                    )
                )

            else:
                logger.debug("Skipping unknown message type: %s", type(msg).__name__)

        return result

    def _convert_ai_message(self, msg: Any) -> Message:
        """Convert a LangGraph AIMessage to a karenina Message.

        Handles text content, structured content blocks, and tool calls.

        Args:
            msg: LangGraph AIMessage instance.

        Returns:
            Karenina Message with Role.ASSISTANT.
        """
        content_blocks: list[Content] = []

        if isinstance(msg.content, str) and msg.content:
            content_blocks.append(TextContent(text=msg.content))
        elif isinstance(msg.content, list):
            for block in msg.content:
                if isinstance(block, str):
                    content_blocks.append(TextContent(text=block))
                elif isinstance(block, dict):
                    if block.get("type") == "text":
                        content_blocks.append(TextContent(text=block["text"]))
                    elif block.get("type") == "tool_use":
                        content_blocks.append(
                            ToolUseContent(
                                id=block.get("id", ""),
                                name=block.get("name", ""),
                                input=block.get("input", {}),
                            )
                        )

        if hasattr(msg, "tool_calls") and msg.tool_calls:
            for tc in msg.tool_calls:
                content_blocks.append(
                    ToolUseContent(
                        id=tc.get("id", ""),
                        name=tc.get("name", ""),
                        input=tc.get("args", {}),
                    )
                )

        if content_blocks:
            return Message(role=Role.ASSISTANT, content=content_blocks)
        return Message.assistant("")
Functions
extract_system_prompt
extract_system_prompt(
    messages: list[Message],
) -> str | None

Extract system prompt from messages.

Parameters:

Name Type Description Default
messages list[Message]

List of karenina Message objects.

required

Returns:

Type Description
str | None

Combined system prompt text, or None if no system messages.

Source code in src/karenina/adapters/langchain_deep_agents/messages.py
def extract_system_prompt(self, messages: list[Message]) -> str | None:
    """Extract system prompt from messages.

    Args:
        messages: List of karenina Message objects.

    Returns:
        Combined system prompt text, or None if no system messages.
    """
    system_parts = []
    for msg in messages:
        if msg.role == Role.SYSTEM:
            text = msg.text
            if text:
                system_parts.append(text)
    return "\n\n".join(system_parts) if system_parts else None
from_provider
from_provider(lc_messages: list[Any]) -> list[Message]

Convert LangGraph BaseMessage list to karenina Messages.

Parameters:

Name Type Description Default
lc_messages list[Any]

List of LangGraph message objects.

required

Returns:

Type Description
list[Message]

List of karenina Message objects.

Source code in src/karenina/adapters/langchain_deep_agents/messages.py
def from_provider(self, lc_messages: list[Any]) -> list[Message]:
    """Convert LangGraph BaseMessage list to karenina Messages.

    Args:
        lc_messages: List of LangGraph message objects.

    Returns:
        List of karenina Message objects.
    """
    from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage

    result: list[Message] = []

    for msg in lc_messages:
        if isinstance(msg, SystemMessage):
            result.append(Message.system(msg.content if isinstance(msg.content, str) else str(msg.content)))

        elif isinstance(msg, HumanMessage):
            result.append(Message.user(msg.content if isinstance(msg.content, str) else str(msg.content)))

        elif isinstance(msg, AIMessage):
            result.append(self._convert_ai_message(msg))

        elif isinstance(msg, ToolMessage):
            content = msg.content if isinstance(msg.content, str) else str(msg.content)
            tool_call_id = getattr(msg, "tool_call_id", "") or ""
            result.append(
                Message(
                    role=Role.TOOL,
                    content=[ToolResultContent(tool_use_id=tool_call_id, content=content)],
                )
            )

        else:
            logger.debug("Skipping unknown message type: %s", type(msg).__name__)

    return result
to_langchain_messages
to_langchain_messages(
    messages: list[Message],
) -> list[dict[str, str]]

Convert karenina messages to LangGraph-compatible dicts.

Parameters:

Name Type Description Default
messages list[Message]

List of karenina Message objects.

required

Returns:

Type Description
list[dict[str, str]]

List of message dicts with role and content keys.

Source code in src/karenina/adapters/langchain_deep_agents/messages.py
def to_langchain_messages(self, messages: list[Message]) -> list[dict[str, str]]:
    """Convert karenina messages to LangGraph-compatible dicts.

    Args:
        messages: List of karenina Message objects.

    Returns:
        List of message dicts with role and content keys.
    """
    result = []
    for msg in messages:
        if msg.role == Role.SYSTEM:
            continue
        role_map = {
            Role.USER: "user",
            Role.ASSISTANT: "assistant",
            Role.TOOL: "tool",
        }
        role = role_map.get(msg.role, "user")
        result.append({"role": role, "content": msg.text or ""})
    return result
to_prompt_string
to_prompt_string(messages: list[Message]) -> str

Convert user/assistant messages to a prompt string.

System messages are excluded (use extract_system_prompt instead).

Parameters:

Name Type Description Default
messages list[Message]

List of karenina Message objects.

required

Returns:

Type Description
str

Concatenated prompt string from non-system messages.

Source code in src/karenina/adapters/langchain_deep_agents/messages.py
def to_prompt_string(self, messages: list[Message]) -> str:
    """Convert user/assistant messages to a prompt string.

    System messages are excluded (use extract_system_prompt instead).

    Args:
        messages: List of karenina Message objects.

    Returns:
        Concatenated prompt string from non-system messages.
    """
    parts = []
    for msg in messages:
        if msg.role == Role.SYSTEM:
            continue
        text = msg.text
        if text:
            parts.append(text)
    return "\n\n".join(parts)

DeepAgentsParserAdapter

Parser adapter using LangChain's structured output for data extraction.

Implements the ParserPort Protocol by using with_structured_output() on the LangChain model. Falls back to JSON extraction from text if structured output is not available.

Example

from pydantic import BaseModel, Field class Answer(BaseModel): ... gene: str = Field(description="Gene name") parser = DeepAgentsParserAdapter(config) result = await parser.aparse_to_pydantic(messages, Answer) print(result.parsed.gene)

Source code in src/karenina/adapters/langchain_deep_agents/parser.py
class DeepAgentsParserAdapter:
    """Parser adapter using LangChain's structured output for data extraction.

    Implements the ParserPort Protocol by using with_structured_output()
    on the LangChain model. Falls back to JSON extraction from text if
    structured output is not available.

    Example:
        >>> from pydantic import BaseModel, Field
        >>> class Answer(BaseModel):
        ...     gene: str = Field(description="Gene name")
        >>> parser = DeepAgentsParserAdapter(config)
        >>> result = await parser.aparse_to_pydantic(messages, Answer)
        >>> print(result.parsed.gene)
    """

    def __init__(self, model_config: ModelConfig) -> None:
        """Initialize the Deep Agents Parser adapter.

        Args:
            model_config: Configuration specifying model, provider, and interface.
        """
        self._config = model_config
        self._converter = DeepAgentsMessageConverter()

    @property
    def capabilities(self) -> PortCapabilities:
        """Declare adapter capabilities.

        Returns:
            PortCapabilities with system_prompt=True and structured_output=True.
        """
        return PortCapabilities(
            supports_system_prompt=True,
            supports_structured_output=True,
        )

    async def aparse_to_pydantic(
        self,
        messages: list[Any],
        schema: type[T],
    ) -> ParsePortResult[T]:
        """Parse pre-assembled prompt messages into a Pydantic model.

        Uses LangChain's with_structured_output() to constrain the LLM.
        Falls back to JSON extraction if structured output fails.

        Args:
            messages: Pre-assembled prompt messages (system + user).
            schema: A Pydantic model class defining the expected structure.

        Returns:
            ParsePortResult containing the parsed model and usage metadata.

        Raises:
            ParseError: If the LLM fails to produce valid structured data.
        """
        from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

        # Build LangChain message list
        lc_messages: list[Any] = []
        for msg in messages:
            text = msg.text or ""
            if msg.role.value == "system":
                lc_messages.append(SystemMessage(content=text))
            elif msg.role.value == "user":
                lc_messages.append(HumanMessage(content=text))
            elif msg.role.value == "assistant":
                lc_messages.append(AIMessage(content=text))

        # Create model with structured output
        chat_model = create_chat_model(self._config)

        try:
            # Use include_raw=True to get the AIMessage alongside the parsed output.
            # This is critical for usage tracking: without it, with_structured_output
            # returns only the parsed dict/model, losing the AIMessage.response_metadata
            # where token counts live.
            structured_model = chat_model.with_structured_output(schema, include_raw=True)
            raw_response = await structured_model.ainvoke(lc_messages)
        except Exception as e:
            logger.warning("Structured output failed, falling back to text extraction: %s", e)
            response = await chat_model.ainvoke(lc_messages)
            return self._extract_from_text(response, schema)

        # include_raw=True returns {"raw": AIMessage, "parsed": dict/model, "parsing_error": ...}
        parsed_output = raw_response.get("parsed") if isinstance(raw_response, dict) else raw_response
        raw_msg = raw_response.get("raw") if isinstance(raw_response, dict) else None
        usage = self._extract_usage_from_response(raw_msg) if raw_msg else UsageMetadata(model=self._config.model_name)

        if isinstance(parsed_output, schema):
            return ParsePortResult(parsed=parsed_output, usage=usage)

        if isinstance(parsed_output, dict):
            try:
                parsed = schema.model_validate(parsed_output)
                return ParsePortResult(parsed=parsed, usage=usage)
            except Exception as e:
                raise ParseError(f"Failed to validate structured output: {e}") from e

        if isinstance(parsed_output, BaseModel):
            try:
                parsed = schema.model_validate(parsed_output.model_dump())
                return ParsePortResult(parsed=parsed, usage=usage)
            except Exception as e:
                raise ParseError(f"Failed to convert structured output to target schema: {e}") from e

        raise ParseError(f"Unexpected response type from structured output: {type(parsed_output).__name__}")

    def _extract_from_text(self, response: Any, schema: type[T]) -> ParsePortResult[T]:
        """Extract structured data from a text response (fallback path).

        Args:
            response: The AIMessage response from the LLM.
            schema: The target Pydantic schema.

        Returns:
            ParsePortResult with the parsed model.

        Raises:
            ParseError: If JSON extraction or validation fails.
        """
        from langchain_core.messages import AIMessage

        content = ""
        if isinstance(response, AIMessage):
            content = response.content if isinstance(response.content, str) else str(response.content)
        else:
            content = str(response)

        usage = self._extract_usage_from_response(response)

        # Try to extract JSON from the text
        try:
            # Look for JSON in code blocks or raw JSON
            json_str = content
            if "```json" in content:
                json_str = content.split("```json")[1].split("```")[0].strip()
            elif "```" in content:
                json_str = content.split("```")[1].split("```")[0].strip()

            data = json.loads(json_str)
            parsed = schema.model_validate(data)
            return ParsePortResult(parsed=parsed, usage=usage)
        except (json.JSONDecodeError, Exception) as e:
            raise ParseError(f"Failed to extract structured data from text response: {e}") from e

    def _extract_usage_from_response(self, response: Any) -> UsageMetadata:
        """Extract usage metadata from a LangChain response.

        Args:
            response: The response object (AIMessage or other).

        Returns:
            UsageMetadata with token counts if available.
        """
        from langchain_core.messages import AIMessage

        if isinstance(response, AIMessage):
            usage_meta = getattr(response, "usage_metadata", None)
            if usage_meta and isinstance(usage_meta, dict):
                return UsageMetadata(
                    input_tokens=usage_meta.get("input_tokens", 0),
                    output_tokens=usage_meta.get("output_tokens", 0),
                    total_tokens=usage_meta.get("input_tokens", 0) + usage_meta.get("output_tokens", 0),
                    model=self._config.model_name,
                )
        return UsageMetadata(model=self._config.model_name)

    def parse_to_pydantic(
        self,
        messages: list[Any],
        schema: type[T],
    ) -> ParsePortResult[T]:
        """Parse pre-assembled prompt messages (sync wrapper).

        Args:
            messages: Pre-assembled prompt messages.
            schema: A Pydantic model class defining the expected structure.

        Returns:
            ParsePortResult containing the parsed model and usage metadata.
        """
        from karenina.benchmark.verification.executor import get_async_portal

        portal = get_async_portal()
        if portal is not None:
            return portal.call(self.aparse_to_pydantic, messages, schema)

        try:
            asyncio.get_running_loop()

            def run_in_thread() -> ParsePortResult[T]:
                return asyncio.run(self.aparse_to_pydantic(messages, schema))

            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(run_in_thread)
                return future.result(timeout=600)

        except RuntimeError:
            return asyncio.run(self.aparse_to_pydantic(messages, schema))
Attributes
capabilities property
capabilities: PortCapabilities

Declare adapter capabilities.

Returns:

Type Description
PortCapabilities

PortCapabilities with system_prompt=True and structured_output=True.

Functions
__init__
__init__(model_config: ModelConfig) -> None

Parameters:

Name Type Description Default
model_config ModelConfig

Configuration specifying model, provider, and interface.

required
Source code in src/karenina/adapters/langchain_deep_agents/parser.py
def __init__(self, model_config: ModelConfig) -> None:
    """Initialize the Deep Agents Parser adapter.

    Args:
        model_config: Configuration specifying model, provider, and interface.
    """
    self._config = model_config
    self._converter = DeepAgentsMessageConverter()
aparse_to_pydantic async
aparse_to_pydantic(
    messages: list[Any], schema: type[T]
) -> ParsePortResult[T]

Parse pre-assembled prompt messages into a Pydantic model.

Uses LangChain's with_structured_output() to constrain the LLM. Falls back to JSON extraction if structured output fails.

Parameters:

Name Type Description Default
messages list[Any]

Pre-assembled prompt messages (system + user).

required
schema type[T]

A Pydantic model class defining the expected structure.

required

Returns:

Type Description
ParsePortResult[T]

ParsePortResult containing the parsed model and usage metadata.

Raises:

Type Description
ParseError

If the LLM fails to produce valid structured data.

Source code in src/karenina/adapters/langchain_deep_agents/parser.py
async def aparse_to_pydantic(
    self,
    messages: list[Any],
    schema: type[T],
) -> ParsePortResult[T]:
    """Parse pre-assembled prompt messages into a Pydantic model.

    Uses LangChain's with_structured_output() to constrain the LLM.
    Falls back to JSON extraction if structured output fails.

    Args:
        messages: Pre-assembled prompt messages (system + user).
        schema: A Pydantic model class defining the expected structure.

    Returns:
        ParsePortResult containing the parsed model and usage metadata.

    Raises:
        ParseError: If the LLM fails to produce valid structured data.
    """
    from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

    # Build LangChain message list
    lc_messages: list[Any] = []
    for msg in messages:
        text = msg.text or ""
        if msg.role.value == "system":
            lc_messages.append(SystemMessage(content=text))
        elif msg.role.value == "user":
            lc_messages.append(HumanMessage(content=text))
        elif msg.role.value == "assistant":
            lc_messages.append(AIMessage(content=text))

    # Create model with structured output
    chat_model = create_chat_model(self._config)

    try:
        # Use include_raw=True to get the AIMessage alongside the parsed output.
        # This is critical for usage tracking: without it, with_structured_output
        # returns only the parsed dict/model, losing the AIMessage.response_metadata
        # where token counts live.
        structured_model = chat_model.with_structured_output(schema, include_raw=True)
        raw_response = await structured_model.ainvoke(lc_messages)
    except Exception as e:
        logger.warning("Structured output failed, falling back to text extraction: %s", e)
        response = await chat_model.ainvoke(lc_messages)
        return self._extract_from_text(response, schema)

    # include_raw=True returns {"raw": AIMessage, "parsed": dict/model, "parsing_error": ...}
    parsed_output = raw_response.get("parsed") if isinstance(raw_response, dict) else raw_response
    raw_msg = raw_response.get("raw") if isinstance(raw_response, dict) else None
    usage = self._extract_usage_from_response(raw_msg) if raw_msg else UsageMetadata(model=self._config.model_name)

    if isinstance(parsed_output, schema):
        return ParsePortResult(parsed=parsed_output, usage=usage)

    if isinstance(parsed_output, dict):
        try:
            parsed = schema.model_validate(parsed_output)
            return ParsePortResult(parsed=parsed, usage=usage)
        except Exception as e:
            raise ParseError(f"Failed to validate structured output: {e}") from e

    if isinstance(parsed_output, BaseModel):
        try:
            parsed = schema.model_validate(parsed_output.model_dump())
            return ParsePortResult(parsed=parsed, usage=usage)
        except Exception as e:
            raise ParseError(f"Failed to convert structured output to target schema: {e}") from e

    raise ParseError(f"Unexpected response type from structured output: {type(parsed_output).__name__}")
parse_to_pydantic
parse_to_pydantic(
    messages: list[Any], schema: type[T]
) -> ParsePortResult[T]

Parse pre-assembled prompt messages (sync wrapper).

Parameters:

Name Type Description Default
messages list[Any]

Pre-assembled prompt messages.

required
schema type[T]

A Pydantic model class defining the expected structure.

required

Returns:

Type Description
ParsePortResult[T]

ParsePortResult containing the parsed model and usage metadata.

Source code in src/karenina/adapters/langchain_deep_agents/parser.py
def parse_to_pydantic(
    self,
    messages: list[Any],
    schema: type[T],
) -> ParsePortResult[T]:
    """Parse pre-assembled prompt messages (sync wrapper).

    Args:
        messages: Pre-assembled prompt messages.
        schema: A Pydantic model class defining the expected structure.

    Returns:
        ParsePortResult containing the parsed model and usage metadata.
    """
    from karenina.benchmark.verification.executor import get_async_portal

    portal = get_async_portal()
    if portal is not None:
        return portal.call(self.aparse_to_pydantic, messages, schema)

    try:
        asyncio.get_running_loop()

        def run_in_thread() -> ParsePortResult[T]:
            return asyncio.run(self.aparse_to_pydantic(messages, schema))

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(run_in_thread)
            return future.result(timeout=600)

    except RuntimeError:
        return asyncio.run(self.aparse_to_pydantic(messages, schema))

Functions

check_deep_agents_available

check_deep_agents_available() -> AdapterAvailability

Check if the deepagents package is installed.

Returns:

Type Description
AdapterAvailability

AdapterAvailability with status and installation instructions.

Source code in src/karenina/adapters/langchain_deep_agents/availability.py
def check_deep_agents_available() -> AdapterAvailability:
    """Check if the deepagents package is installed.

    Returns:
        AdapterAvailability with status and installation instructions.
    """
    try:
        import deepagents  # noqa: F401

        return AdapterAvailability(
            available=True,
            reason="deepagents package is installed",
        )
    except ImportError:
        return AdapterAvailability(
            available=False,
            reason=("deepagents package not installed. Install with: pip install deepagents or: uv add deepagents"),
            fallback_interface=None,
        )

convert_mcp_to_tools async

convert_mcp_to_tools(
    mcp_servers: dict[str, Any] | None,
) -> list[Any]

Convert MCP server configs to LangChain tools via langchain-mcp-adapters.

Creates a MultiServerMCPClient, connects to all servers, and returns their tools as LangChain BaseTool instances.

Parameters:

Name Type Description Default
mcp_servers
dict[str, Any] | None

Dict mapping server names to MCPServerConfig.

required

Returns:

Type Description
list[Any]

List of LangChain BaseTool instances from all MCP servers.

Source code in src/karenina/adapters/langchain_deep_agents/mcp.py
async def convert_mcp_to_tools(
    mcp_servers: dict[str, Any] | None,
) -> list[Any]:
    """Convert MCP server configs to LangChain tools via langchain-mcp-adapters.

    Creates a MultiServerMCPClient, connects to all servers, and returns
    their tools as LangChain BaseTool instances.

    Args:
        mcp_servers: Dict mapping server names to MCPServerConfig.

    Returns:
        List of LangChain BaseTool instances from all MCP servers.
    """
    server_params = build_mcp_server_params(mcp_servers)
    if not server_params:
        return []

    from langchain_mcp_adapters.client import MultiServerMCPClient

    async with MultiServerMCPClient(server_params) as client:  # type: ignore[arg-type,misc]
        return await client.get_tools()

deep_agents_messages_to_raw_trace

deep_agents_messages_to_raw_trace(
    messages: list[Any], include_user_messages: bool = False
) -> str

Convert LangGraph messages to raw trace string format.

Produces delimited trace compatible with existing karenina infrastructure (regex highlighting, database storage, backward compatibility).

Parameters:

Name Type Description Default
messages
list[Any]

List of LangGraph BaseMessage objects.

required
include_user_messages
bool

If True, include HumanMessage in trace.

False

Returns:

Type Description
str

Formatted trace string with --- delimiters.

Source code in src/karenina/adapters/langchain_deep_agents/trace.py
def deep_agents_messages_to_raw_trace(
    messages: list[Any],
    include_user_messages: bool = False,
) -> str:
    """Convert LangGraph messages to raw trace string format.

    Produces delimited trace compatible with existing karenina infrastructure
    (regex highlighting, database storage, backward compatibility).

    Args:
        messages: List of LangGraph BaseMessage objects.
        include_user_messages: If True, include HumanMessage in trace.

    Returns:
        Formatted trace string with --- delimiters.
    """
    from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage

    if not messages:
        return ""

    parts: list[str] = []

    for msg in messages:
        if isinstance(msg, SystemMessage):
            continue

        if isinstance(msg, HumanMessage):
            if include_user_messages:
                parts.append(f"--- Human Message ---\n{msg.content}")
            continue

        if isinstance(msg, AIMessage):
            text = _extract_ai_text(msg)

            if text:
                parts.append(f"--- AI Message ---\n{text}")

            if hasattr(msg, "tool_calls") and msg.tool_calls:
                for tc in msg.tool_calls:
                    tool_name = tc.get("name", "unknown")
                    tool_input = tc.get("args", {})
                    input_str = json.dumps(tool_input, indent=2) if tool_input else "{}"
                    parts.append(f"--- Tool Call ---\nTool: {tool_name}\nInput: {input_str}")

        elif isinstance(msg, ToolMessage):
            content = msg.content if isinstance(msg.content, str) else str(msg.content)
            parts.append(f"--- Tool Result ---\n{content}")

    return "\n\n".join(parts)

extract_deep_agents_usage

extract_deep_agents_usage(
    messages: list[Any], model: str | None = None
) -> UsageMetadata

Extract aggregated usage metadata from LangGraph messages.

Sums token counts across all AIMessage instances in the conversation. Token counts come from AIMessage.usage_metadata (preferred) or AIMessage.response_metadata.token_usage (fallback).

Parameters:

Name Type Description Default
messages
list[Any]

List of LangGraph BaseMessage objects.

required
model
str | None

Model name to include in usage metadata.

None

Returns:

Type Description
UsageMetadata

Aggregated UsageMetadata for the entire agent run.

Source code in src/karenina/adapters/langchain_deep_agents/usage.py
def extract_deep_agents_usage(
    messages: list[Any],
    model: str | None = None,
) -> UsageMetadata:
    """Extract aggregated usage metadata from LangGraph messages.

    Sums token counts across all AIMessage instances in the conversation.
    Token counts come from AIMessage.usage_metadata (preferred) or
    AIMessage.response_metadata.token_usage (fallback).

    Args:
        messages: List of LangGraph BaseMessage objects.
        model: Model name to include in usage metadata.

    Returns:
        Aggregated UsageMetadata for the entire agent run.
    """
    from langchain_core.messages import AIMessage

    total_input = 0
    total_output = 0

    for msg in messages:
        if not isinstance(msg, AIMessage):
            continue

        usage_meta = getattr(msg, "usage_metadata", None)
        if usage_meta and isinstance(usage_meta, dict):
            total_input += usage_meta.get("input_tokens", 0)
            total_output += usage_meta.get("output_tokens", 0)
            continue

        resp_meta = getattr(msg, "response_metadata", None)
        if resp_meta and isinstance(resp_meta, dict):
            token_usage = resp_meta.get("token_usage", {})
            if isinstance(token_usage, dict):
                total_input += token_usage.get("prompt_tokens", 0)
                total_output += token_usage.get("completion_tokens", 0)

    return UsageMetadata(
        input_tokens=total_input,
        output_tokens=total_output,
        total_tokens=total_input + total_output,
        model=model,
    )

wrap_deep_agents_error

wrap_deep_agents_error(
    error: Exception,
) -> tuple[Exception, bool]

Map a Deep Agents / LangGraph exception to a karenina exception.

Parameters:

Name Type Description Default
error
Exception

The original exception from Deep Agents or LangGraph.

required

Returns:

Type Description
Exception

Tuple of (mapped_exception, limit_reached). The limit_reached flag

bool

is True when the error indicates the agent hit a recursion or turn limit.

Source code in src/karenina/adapters/langchain_deep_agents/errors.py
def wrap_deep_agents_error(error: Exception) -> tuple[Exception, bool]:
    """Map a Deep Agents / LangGraph exception to a karenina exception.

    Args:
        error: The original exception from Deep Agents or LangGraph.

    Returns:
        Tuple of (mapped_exception, limit_reached). The limit_reached flag
        is True when the error indicates the agent hit a recursion or turn limit.
    """
    error_str = str(error).lower()

    # Check for recursion / turn limit errors
    if "recursion" in error_str or "limit" in error_str or "max_turns" in error_str:
        return (
            AgentExecutionError(f"Agent hit turn limit: {error}"),
            True,
        )

    # Timeout errors
    if isinstance(error, TimeoutError | asyncio.TimeoutError):
        return AgentTimeoutError(f"Agent execution timed out: {error}"), False

    # Output parsing errors
    if "parse" in error_str or "output" in error_str and "format" in error_str:
        return AgentResponseError(f"Failed to parse agent response: {error}"), False

    # GraphRecursionError from LangGraph
    try:
        from langgraph.errors import GraphRecursionError

        if isinstance(error, GraphRecursionError):
            return (
                AgentExecutionError(f"Agent hit recursion limit: {error}"),
                True,
            )
    except ImportError:
        pass

    # Default: general execution error
    return AgentExecutionError(f"Agent execution failed: {error}"), False