Skip to content

karenina.utils.answer_cache

answer_cache

Thread-safe answer caching for verification pipeline.

This module provides caching infrastructure to prevent duplicate answer generation when multiple judges (parsing models) evaluate the same answering model output.

Classes

AnswerTraceCache

Thread-safe cache for answer traces with non-blocking synchronization.

This cache prevents duplicate answer generation when multiple judges (parsing models) need to evaluate the same answering model output.

Features: - Thread-safe: All operations protected by lock - Non-blocking: Returns immediately with status, no waiting - Race condition prevention: Caller handles requeuing for in-progress answers - Fault tolerance: Failed generations allow retry

Example
cache = AnswerTraceCache()

# Thread 1: Generates answer
status, answer_data = cache.get_or_reserve(key)
if status == "MISS":
    # Generate answer...
    cache.complete(key, answer_data, error=None)

# Thread 2: Gets IN_PROGRESS status
status, answer_data = cache.get_or_reserve(key)
# status == "IN_PROGRESS", caller should requeue task
# Later, after thread 1 completes:
status, answer_data = cache.get_or_reserve(key)
# status == "HIT", answer_data contains the cached answer
Source code in src/karenina/utils/answer_cache.py
class AnswerTraceCache:
    """Thread-safe cache for answer traces with non-blocking synchronization.

    This cache prevents duplicate answer generation when multiple judges
    (parsing models) need to evaluate the same answering model output.

    Features:
    - Thread-safe: All operations protected by lock
    - Non-blocking: Returns immediately with status, no waiting
    - Race condition prevention: Caller handles requeuing for in-progress answers
    - Fault tolerance: Failed generations allow retry

    Example:
        ```python
        cache = AnswerTraceCache()

        # Thread 1: Generates answer
        status, answer_data = cache.get_or_reserve(key)
        if status == "MISS":
            # Generate answer...
            cache.complete(key, answer_data, error=None)

        # Thread 2: Gets IN_PROGRESS status
        status, answer_data = cache.get_or_reserve(key)
        # status == "IN_PROGRESS", caller should requeue task
        # Later, after thread 1 completes:
        status, answer_data = cache.get_or_reserve(key)
        # status == "HIT", answer_data contains the cached answer
        ```
    """

    def __init__(self) -> None:
        """Initialize answer cache."""
        self._cache: dict[str, CacheEntry] = {}
        self._lock = threading.Lock()
        self._stats_hits = 0
        self._stats_misses = 0
        self._stats_waits = 0  # Now tracks IN_PROGRESS returns
        self._stats_timeouts = 0  # Deprecated, kept for backward compatibility

    def get_or_reserve(self, key: str) -> tuple[str, dict[str, Any] | None]:
        """Get cached answer or reserve slot for generation.

        This method implements a three-state cache without blocking:
        1. COMPLETED: Return cached answer
        2. IN_PROGRESS: Return status immediately, caller should requeue
        3. MISSING: Reserve slot and signal caller to generate

        Args:
            key: Cache key (question_id, answering_model_id, replicate)

        Returns:
            Tuple of (status, answer_data):
            - If answer cached: ("HIT", data)
            - If answer in-progress: ("IN_PROGRESS", None) - caller should requeue
            - If missing: ("MISS", None) - caller should generate
        """
        with self._lock:
            entry = self._cache.get(key)

            if entry is None:
                # MISSING: Reserve slot for generation
                self._cache[key] = CacheEntry(is_complete=False)
                self._stats_misses += 1
                logger.debug(f"Cache miss: {key} - reserving slot")
                return "MISS", None

            if entry.is_complete:
                # COMPLETED or FAILED
                if entry.error:
                    # Previous generation failed, allow retry
                    logger.warning(
                        f"Previous answer generation failed for {key} (error: {entry.error}), allowing retry"
                    )
                    del self._cache[key]
                    # Treat as miss to allow retry
                    self._cache[key] = CacheEntry(is_complete=False)
                    self._stats_misses += 1
                    return "MISS", None

                # Success! Return cached answer
                self._stats_hits += 1
                logger.debug(f"Cache hit: {key}")
                return "HIT", entry.answer_data

            # IN_PROGRESS: Return immediately, let caller handle requeuing
            logger.debug(f"Answer in-progress for {key}, returning IN_PROGRESS status")
            self._stats_waits += 1
            return "IN_PROGRESS", None

    def complete(self, key: str, answer_data: dict[str, Any] | None, error: Exception | None = None) -> None:
        """Mark answer generation as complete and notify waiting tasks.

        Args:
            key: Cache key
            answer_data: Generated answer data (None if failed)
            error: Exception if generation failed
        """
        with self._lock:
            entry = self._cache.get(key)
            if entry is None:
                logger.warning(f"Attempted to complete non-existent cache entry: {key}")
                return

            entry.is_complete = True
            entry.answer_data = answer_data
            entry.error = error

            # Notify all waiting tasks
            entry.event.set()

            if error:
                logger.debug(f"Marked cache entry as failed: {key} (error: {error})")
            else:
                logger.debug(f"Marked cache entry as complete: {key}")

    def wait_for_completion(self, key: str, timeout: float = 5.0) -> bool:
        """Wait for an in-progress answer to complete.

        This method allows callers to efficiently wait for an answer that's
        being generated by another task, rather than polling with sleep.

        Args:
            key: Cache key to wait for
            timeout: Maximum seconds to wait (default 5.0)

        Returns:
            True if the answer completed within timeout, False otherwise
        """
        with self._lock:
            entry = self._cache.get(key)
            if entry is None:
                # Key doesn't exist - nothing to wait for
                return True
            if entry.is_complete:
                # Already complete
                return True
            # Get the event to wait on (outside the lock)
            event = entry.event

        # Wait outside the lock to avoid blocking other operations
        completed = event.wait(timeout=timeout)
        if not completed:
            self._stats_timeouts += 1
        return completed

    def get_stats(self) -> dict[str, int]:
        """Get cache statistics.

        Returns:
            Dictionary with hit/miss/wait/timeout counts
        """
        return {
            "hits": self._stats_hits,
            "misses": self._stats_misses,
            "waits": self._stats_waits,
            "timeouts": self._stats_timeouts,
        }
Functions
__init__
__init__() -> None
Source code in src/karenina/utils/answer_cache.py
def __init__(self) -> None:
    """Initialize answer cache."""
    self._cache: dict[str, CacheEntry] = {}
    self._lock = threading.Lock()
    self._stats_hits = 0
    self._stats_misses = 0
    self._stats_waits = 0  # Now tracks IN_PROGRESS returns
    self._stats_timeouts = 0  # Deprecated, kept for backward compatibility
complete
complete(
    key: str,
    answer_data: dict[str, Any] | None,
    error: Exception | None = None,
) -> None

Mark answer generation as complete and notify waiting tasks.

Parameters:

Name Type Description Default
key str

Cache key

required
answer_data dict[str, Any] | None

Generated answer data (None if failed)

required
error Exception | None

Exception if generation failed

None
Source code in src/karenina/utils/answer_cache.py
def complete(self, key: str, answer_data: dict[str, Any] | None, error: Exception | None = None) -> None:
    """Mark answer generation as complete and notify waiting tasks.

    Args:
        key: Cache key
        answer_data: Generated answer data (None if failed)
        error: Exception if generation failed
    """
    with self._lock:
        entry = self._cache.get(key)
        if entry is None:
            logger.warning(f"Attempted to complete non-existent cache entry: {key}")
            return

        entry.is_complete = True
        entry.answer_data = answer_data
        entry.error = error

        # Notify all waiting tasks
        entry.event.set()

        if error:
            logger.debug(f"Marked cache entry as failed: {key} (error: {error})")
        else:
            logger.debug(f"Marked cache entry as complete: {key}")
get_or_reserve
get_or_reserve(
    key: str,
) -> tuple[str, dict[str, Any] | None]

Get cached answer or reserve slot for generation.

This method implements a three-state cache without blocking: 1. COMPLETED: Return cached answer 2. IN_PROGRESS: Return status immediately, caller should requeue 3. MISSING: Reserve slot and signal caller to generate

Parameters:

Name Type Description Default
key str

Cache key (question_id, answering_model_id, replicate)

required

Returns:

Type Description
str

Tuple of (status, answer_data):

dict[str, Any] | None
  • If answer cached: ("HIT", data)
tuple[str, dict[str, Any] | None]
  • If answer in-progress: ("IN_PROGRESS", None) - caller should requeue
tuple[str, dict[str, Any] | None]
  • If missing: ("MISS", None) - caller should generate
Source code in src/karenina/utils/answer_cache.py
def get_or_reserve(self, key: str) -> tuple[str, dict[str, Any] | None]:
    """Get cached answer or reserve slot for generation.

    This method implements a three-state cache without blocking:
    1. COMPLETED: Return cached answer
    2. IN_PROGRESS: Return status immediately, caller should requeue
    3. MISSING: Reserve slot and signal caller to generate

    Args:
        key: Cache key (question_id, answering_model_id, replicate)

    Returns:
        Tuple of (status, answer_data):
        - If answer cached: ("HIT", data)
        - If answer in-progress: ("IN_PROGRESS", None) - caller should requeue
        - If missing: ("MISS", None) - caller should generate
    """
    with self._lock:
        entry = self._cache.get(key)

        if entry is None:
            # MISSING: Reserve slot for generation
            self._cache[key] = CacheEntry(is_complete=False)
            self._stats_misses += 1
            logger.debug(f"Cache miss: {key} - reserving slot")
            return "MISS", None

        if entry.is_complete:
            # COMPLETED or FAILED
            if entry.error:
                # Previous generation failed, allow retry
                logger.warning(
                    f"Previous answer generation failed for {key} (error: {entry.error}), allowing retry"
                )
                del self._cache[key]
                # Treat as miss to allow retry
                self._cache[key] = CacheEntry(is_complete=False)
                self._stats_misses += 1
                return "MISS", None

            # Success! Return cached answer
            self._stats_hits += 1
            logger.debug(f"Cache hit: {key}")
            return "HIT", entry.answer_data

        # IN_PROGRESS: Return immediately, let caller handle requeuing
        logger.debug(f"Answer in-progress for {key}, returning IN_PROGRESS status")
        self._stats_waits += 1
        return "IN_PROGRESS", None
get_stats
get_stats() -> dict[str, int]

Get cache statistics.

Returns:

Type Description
dict[str, int]

Dictionary with hit/miss/wait/timeout counts

Source code in src/karenina/utils/answer_cache.py
def get_stats(self) -> dict[str, int]:
    """Get cache statistics.

    Returns:
        Dictionary with hit/miss/wait/timeout counts
    """
    return {
        "hits": self._stats_hits,
        "misses": self._stats_misses,
        "waits": self._stats_waits,
        "timeouts": self._stats_timeouts,
    }
wait_for_completion
wait_for_completion(key: str, timeout: float = 5.0) -> bool

Wait for an in-progress answer to complete.

This method allows callers to efficiently wait for an answer that's being generated by another task, rather than polling with sleep.

Parameters:

Name Type Description Default
key str

Cache key to wait for

required
timeout float

Maximum seconds to wait (default 5.0)

5.0

Returns:

Type Description
bool

True if the answer completed within timeout, False otherwise

Source code in src/karenina/utils/answer_cache.py
def wait_for_completion(self, key: str, timeout: float = 5.0) -> bool:
    """Wait for an in-progress answer to complete.

    This method allows callers to efficiently wait for an answer that's
    being generated by another task, rather than polling with sleep.

    Args:
        key: Cache key to wait for
        timeout: Maximum seconds to wait (default 5.0)

    Returns:
        True if the answer completed within timeout, False otherwise
    """
    with self._lock:
        entry = self._cache.get(key)
        if entry is None:
            # Key doesn't exist - nothing to wait for
            return True
        if entry.is_complete:
            # Already complete
            return True
        # Get the event to wait on (outside the lock)
        event = entry.event

    # Wait outside the lock to avoid blocking other operations
    completed = event.wait(timeout=timeout)
    if not completed:
        self._stats_timeouts += 1
    return completed

CacheEntry

Represents a cache entry for an answer generation task.

Supports three states: - IN_PROGRESS: Answer is being generated by another task - COMPLETED: Answer generation succeeded - FAILED: Answer generation failed

Source code in src/karenina/utils/answer_cache.py
class CacheEntry:
    """Represents a cache entry for an answer generation task.

    Supports three states:
    - IN_PROGRESS: Answer is being generated by another task
    - COMPLETED: Answer generation succeeded
    - FAILED: Answer generation failed
    """

    def __init__(self, is_complete: bool = False) -> None:
        """Initialize cache entry.

        Args:
            is_complete: Whether the answer is already complete
        """
        self.is_complete = is_complete
        self.event = threading.Event()  # Signal completion to waiting tasks
        self.answer_data: dict[str, Any] | None = None
        self.error: Exception | None = None
        self.timestamp = time.time()
Functions
__init__
__init__(is_complete: bool = False) -> None

Parameters:

Name Type Description Default
is_complete bool

Whether the answer is already complete

False
Source code in src/karenina/utils/answer_cache.py
def __init__(self, is_complete: bool = False) -> None:
    """Initialize cache entry.

    Args:
        is_complete: Whether the answer is already complete
    """
    self.is_complete = is_complete
    self.event = threading.Event()  # Signal completion to waiting tasks
    self.answer_data: dict[str, Any] | None = None
    self.error: Exception | None = None
    self.timestamp = time.time()