Skip to content

karenina.schemas.entities

entities

Core business entity models.

This module contains the fundamental entities used throughout Karenina: - BaseAnswer: Base class for answer templates - Question: Benchmark question definition - Rubric: Rubric traits for qualitative evaluation - VerifiedField: Declarative field verification for answer templates - Primitives: Verification primitives (ExactMatch, BooleanMatch, etc.) - Composition: Strategy nodes for combining field results (AllOf, AnyOf, etc.)

Classes

AgenticRubricTrait

Bases: BaseModel

Rubric trait evaluated by an agent with tools.

Unlike LLMRubricTrait (single LLM call), this trait launches an agent that can investigate the response and workspace using tools before producing a score. Supports boolean, score, literal, and template kinds.

When kind is a BaseModel subclass (template kind), the agent produces structured output matching that schema instead of a scalar score. Template kinds require higher_is_better=None because directionality is not meaningful for structured results.

Source code in src/karenina/schemas/entities/rubric.py
class AgenticRubricTrait(BaseModel):
    """Rubric trait evaluated by an agent with tools.

    Unlike LLMRubricTrait (single LLM call), this trait launches an agent
    that can investigate the response and workspace using tools before
    producing a score. Supports boolean, score, literal, and template kinds.

    When ``kind`` is a ``BaseModel`` subclass (template kind), the agent
    produces structured output matching that schema instead of a scalar
    score. Template kinds require ``higher_is_better=None`` because
    directionality is not meaningful for structured results.
    """

    model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)

    name: str = Field(..., min_length=1)
    description: str = Field(..., min_length=1)
    summary: str | None = Field(None, description="Short concept label for dynamic rubric presence check")
    kind: Literal["boolean", "score", "literal"] | type[BaseModel]
    higher_is_better: bool | None = Field(
        ...,
        description="Whether higher values indicate better performance. "
        "For boolean: True means True is good. "
        "For score: True means higher scores are better. "
        "For literal: True means higher indices (later classes) are better. "
        "Must be None for template kind.",
    )
    min_score: int | None = Field(1, description="Lower bound for score traits (default: 1). Auto-derived for literal.")
    max_score: int | None = Field(5, description="Upper bound for score traits (default: 5). Auto-derived for literal.")
    classes: dict[str, str] | None = None
    context_mode: Literal["workspace_only", "trace_and_workspace", "trace_only"] = "trace_and_workspace"
    materialize_trace: bool = Field(
        False,
        description=(
            "Write the agent trace to a file in the workspace instead of "
            "including it in the prompt. The agent receives the file path "
            "and can use grep/search tools on it."
        ),
    )
    persist_trace: bool = Field(
        False,
        description=(
            "When True, the materialized trace file is kept after evaluation. "
            "When False (default), cleaned up after evaluation."
        ),
    )
    max_turns: int = Field(15, gt=0)
    timeout_seconds: int = Field(120, gt=0)
    model_override: "ModelConfig | None" = None

    @field_validator("kind", mode="before")
    @classmethod
    def validate_kind(cls, v: Any) -> Any:
        """Accept string literals, BaseModel subclasses, or serialized template dicts."""
        if isinstance(v, str):
            return v
        if isinstance(v, type) and issubclass(v, BaseModel):
            _validate_template_fields(v)
            return v
        if isinstance(v, dict) and v.get("type") == "template":
            schema = v.get("schema")
            if schema is None:
                raise ValueError("Template kind dict must include a 'schema' key")
            return _reconstruct_model_from_schema(schema)
        raise ValueError(f"kind must be a string literal, BaseModel subclass, or template dict, got {type(v)}")

    @field_serializer("kind")
    def serialize_kind(self, value: Any, _info: Any) -> Any:
        """Serialize BaseModel subclass to a template dict with JSON Schema."""
        if isinstance(value, type) and issubclass(value, BaseModel):
            return {"type": "template", "schema": value.model_json_schema()}
        return value

    @model_validator(mode="before")
    @classmethod
    def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Set default for higher_is_better when loading legacy data.

        Skips the legacy default (True) when kind is a template, because
        template kinds require higher_is_better=None.
        """
        if not isinstance(values, dict):
            return values
        kind = values.get("kind")
        # Template kind: do not inject legacy default
        if not isinstance(kind, str):
            return values
        if "higher_is_better" not in values or values.get("higher_is_better") is None:
            values["higher_is_better"] = True
        return values

    @field_validator("higher_is_better", mode="after")
    @classmethod
    def validate_higher_is_better(cls, v: bool | None, info: Any) -> bool | None:
        """Enforce higher_is_better=None for template kind."""
        kind = info.data.get("kind")
        if isinstance(kind, type) and issubclass(kind, BaseModel) and v is not None:
            raise ValueError("higher_is_better must be None for template kind")
        return v

    @model_validator(mode="after")
    def validate_kind_fields(self) -> "AgenticRubricTrait":
        """Validate kind-specific field constraints."""
        if self.materialize_trace and self.context_mode == "workspace_only":
            raise ValueError(
                "materialize_trace=True requires a trace, but context_mode='workspace_only' "
                "excludes the trace. Use 'trace_only' or 'trace_and_workspace'."
            )
        if not isinstance(self.kind, str):
            return self
        if self.kind == "literal":
            if not self.classes:
                raise ValueError("classes field is required for literal kind")
            # Automatically derive min_score and max_score from classes
            object.__setattr__(self, "min_score", 0)
            object.__setattr__(self, "max_score", len(self.classes) - 1)
        return self

    @model_validator(mode="after")
    def validate_model_override_supports_agents(self) -> "AgenticRubricTrait":
        """Validate that model_override supports agent creation (if set)."""
        if self.model_override is not None:
            from karenina.adapters.registry import AdapterRegistry

            spec = AdapterRegistry.get_spec(self.model_override.interface)
            if spec is None or spec.agent_tier != "deep_agent":
                tier = spec.agent_tier if spec else "unknown"
                raise ValueError(
                    f"model_override interface '{self.model_override.interface}' "
                    f"has agent_tier='{tier}'; agentic traits require "
                    f"agent_tier='deep_agent'."
                )
        return self

    def validate_score(self, value: int | bool) -> bool:
        """Validate that a given score is valid for this trait."""
        if self.is_template_kind:
            return True
        if self.kind == "boolean":
            return isinstance(value, bool)
        else:
            if isinstance(value, bool):
                return False
            if not isinstance(value, int):
                return False
            min_val = self.min_score if self.min_score is not None else 0
            max_val = self.max_score if self.max_score is not None else 5
            if self.kind == "literal" and value == -1:
                return True
            return min_val <= value <= max_val

    @property
    def is_template_kind(self) -> bool:
        """Return True if kind is a BaseModel subclass (template kind)."""
        return isinstance(self.kind, type) and issubclass(self.kind, BaseModel)
Attributes
is_template_kind property
is_template_kind: bool

Return True if kind is a BaseModel subclass (template kind).

Functions
serialize_kind
serialize_kind(value: Any, _info: Any) -> Any

Serialize BaseModel subclass to a template dict with JSON Schema.

Source code in src/karenina/schemas/entities/rubric.py
@field_serializer("kind")
def serialize_kind(self, value: Any, _info: Any) -> Any:
    """Serialize BaseModel subclass to a template dict with JSON Schema."""
    if isinstance(value, type) and issubclass(value, BaseModel):
        return {"type": "template", "schema": value.model_json_schema()}
    return value
set_legacy_defaults classmethod
set_legacy_defaults(
    values: dict[str, Any],
) -> dict[str, Any]

Set default for higher_is_better when loading legacy data.

Skips the legacy default (True) when kind is a template, because template kinds require higher_is_better=None.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="before")
@classmethod
def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Set default for higher_is_better when loading legacy data.

    Skips the legacy default (True) when kind is a template, because
    template kinds require higher_is_better=None.
    """
    if not isinstance(values, dict):
        return values
    kind = values.get("kind")
    # Template kind: do not inject legacy default
    if not isinstance(kind, str):
        return values
    if "higher_is_better" not in values or values.get("higher_is_better") is None:
        values["higher_is_better"] = True
    return values
validate_higher_is_better classmethod
validate_higher_is_better(
    v: bool | None, info: Any
) -> bool | None

Enforce higher_is_better=None for template kind.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("higher_is_better", mode="after")
@classmethod
def validate_higher_is_better(cls, v: bool | None, info: Any) -> bool | None:
    """Enforce higher_is_better=None for template kind."""
    kind = info.data.get("kind")
    if isinstance(kind, type) and issubclass(kind, BaseModel) and v is not None:
        raise ValueError("higher_is_better must be None for template kind")
    return v
validate_kind classmethod
validate_kind(v: Any) -> Any

Accept string literals, BaseModel subclasses, or serialized template dicts.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("kind", mode="before")
@classmethod
def validate_kind(cls, v: Any) -> Any:
    """Accept string literals, BaseModel subclasses, or serialized template dicts."""
    if isinstance(v, str):
        return v
    if isinstance(v, type) and issubclass(v, BaseModel):
        _validate_template_fields(v)
        return v
    if isinstance(v, dict) and v.get("type") == "template":
        schema = v.get("schema")
        if schema is None:
            raise ValueError("Template kind dict must include a 'schema' key")
        return _reconstruct_model_from_schema(schema)
    raise ValueError(f"kind must be a string literal, BaseModel subclass, or template dict, got {type(v)}")
validate_kind_fields
validate_kind_fields() -> AgenticRubricTrait

Validate kind-specific field constraints.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_kind_fields(self) -> "AgenticRubricTrait":
    """Validate kind-specific field constraints."""
    if self.materialize_trace and self.context_mode == "workspace_only":
        raise ValueError(
            "materialize_trace=True requires a trace, but context_mode='workspace_only' "
            "excludes the trace. Use 'trace_only' or 'trace_and_workspace'."
        )
    if not isinstance(self.kind, str):
        return self
    if self.kind == "literal":
        if not self.classes:
            raise ValueError("classes field is required for literal kind")
        # Automatically derive min_score and max_score from classes
        object.__setattr__(self, "min_score", 0)
        object.__setattr__(self, "max_score", len(self.classes) - 1)
    return self
validate_model_override_supports_agents
validate_model_override_supports_agents() -> (
    AgenticRubricTrait
)

Validate that model_override supports agent creation (if set).

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_model_override_supports_agents(self) -> "AgenticRubricTrait":
    """Validate that model_override supports agent creation (if set)."""
    if self.model_override is not None:
        from karenina.adapters.registry import AdapterRegistry

        spec = AdapterRegistry.get_spec(self.model_override.interface)
        if spec is None or spec.agent_tier != "deep_agent":
            tier = spec.agent_tier if spec else "unknown"
            raise ValueError(
                f"model_override interface '{self.model_override.interface}' "
                f"has agent_tier='{tier}'; agentic traits require "
                f"agent_tier='deep_agent'."
            )
    return self
validate_score
validate_score(value: int | bool) -> bool

Validate that a given score is valid for this trait.

Source code in src/karenina/schemas/entities/rubric.py
def validate_score(self, value: int | bool) -> bool:
    """Validate that a given score is valid for this trait."""
    if self.is_template_kind:
        return True
    if self.kind == "boolean":
        return isinstance(value, bool)
    else:
        if isinstance(value, bool):
            return False
        if not isinstance(value, int):
            return False
        min_val = self.min_score if self.min_score is not None else 0
        max_val = self.max_score if self.max_score is not None else 5
        if self.kind == "literal" and value == -1:
            return True
        return min_val <= value <= max_val

AllOf

Bases: AllOf

All child conditions must pass (template domain).

Overrides conditions with the discriminated StrategyNode union so nested trees deserialize correctly.

Source code in src/karenina/schemas/entities/composition.py
class AllOf(_GenericAllOf):
    """All child conditions must pass (template domain).

    Overrides conditions with the discriminated StrategyNode union
    so nested trees deserialize correctly.
    """

    conditions: list[StrategyNode] = []

AnyOf

Bases: AnyOf

At least one child condition must pass (template domain).

Overrides conditions with the discriminated StrategyNode union so nested trees deserialize correctly.

Source code in src/karenina/schemas/entities/composition.py
class AnyOf(_GenericAnyOf):
    """At least one child condition must pass (template domain).

    Overrides conditions with the discriminated StrategyNode union
    so nested trees deserialize correctly.
    """

    conditions: list[StrategyNode] = []

AtLeastN

Bases: AtLeastN

N or more child conditions must pass (template domain).

Overrides conditions with the discriminated StrategyNode union so nested trees deserialize correctly.

Source code in src/karenina/schemas/entities/composition.py
class AtLeastN(_GenericAtLeastN):
    """N or more child conditions must pass (template domain).

    Overrides conditions with the discriminated StrategyNode union
    so nested trees deserialize correctly.
    """

    conditions: list[StrategyNode] = []

BaseAnswer

Bases: BaseModel

Base class for all answer templates in Karenina.

This class provides common functionality and configuration for answer validation and processing.

Source code in src/karenina/schemas/entities/answer.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class BaseAnswer(BaseModel):
    """Base class for all answer templates in Karenina.

    This class provides common functionality and configuration for answer
    validation and processing.
    """

    model_config = ConfigDict(extra="allow")

    # Question ID will be set programmatically after class instantiation
    id: str | None = None

    # Source code storage (set automatically via __init_subclass__ or manually for exec-created classes)
    # Using ClassVar to prevent Pydantic from treating this as a model field
    _source_code: ClassVar[str | None] = None

    def __init_subclass__(cls, **kwargs: Any) -> None:
        """Automatically capture source code when Answer classes are defined.

        This hook is called whenever a class inherits from BaseAnswer.
        It attempts to capture the source code using inspect.getsource().
        For exec-created classes, this will fail and _source_code will be None,
        but can be set manually afterwards.
        """
        super().__init_subclass__(**kwargs)
        try:
            cls._source_code = inspect.getsource(cls)
        except (OSError, TypeError):
            # This happens for exec-created classes or when source isn't available
            # The source code can be set manually after class creation
            cls._source_code = None

        # Bridge: ground_truth(self) -> model_post_init(self, __context)
        if "ground_truth" in cls.__dict__ and "model_post_init" not in cls.__dict__:
            original_gt = cls.__dict__["ground_truth"]

            def _bridged_model_post_init(self: Any, __context: Any) -> None:
                original_gt(self)

            cls.model_post_init = _bridged_model_post_init  # type: ignore[method-assign]

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """Auto-assign verify/verify_granular for VerifiedField templates.

        This hook runs after Pydantic has fully built the model, so
        model_fields is populated and _get_verified_fields() can inspect
        json_schema_extra. Only templates with at least one VerifiedField
        get the auto-generated methods; classic templates are left alone.
        """
        super().__pydantic_init_subclass__(**kwargs)

        # Reject reserved field names that would collide with internal attributes
        reserved_conflicts = _RESERVED_FIELD_NAMES & set(cls.model_fields.keys())
        if reserved_conflicts:
            raise TypeError(
                f"Field name(s) {reserved_conflicts} reserved by BaseAnswer for internal use. "
                f"Please rename your field(s) to avoid collision."
            )

        verified = cls._get_verified_fields()
        if not verified:
            return

        def _has_own_method(target_cls: type, method_name: str) -> bool:
            """Check if any class in MRO (between cls and BaseAnswer) defines method."""
            for klass in target_cls.__mro__:
                if klass is BaseAnswer:
                    break
                if method_name in klass.__dict__:
                    return True
            return False

        if not _has_own_method(cls, "verify"):
            cls.verify = BaseAnswer._auto_verify  # type: ignore[attr-defined]
        if not _has_own_method(cls, "verify_granular"):
            cls.verify_granular = BaseAnswer._auto_verify_granular  # type: ignore[attr-defined]

    @classmethod
    def get_source_code(cls) -> str | None:
        """Get the source code of this Answer class.

        Returns:
            The source code string if available, None otherwise.

        For file-based classes, source code is captured automatically.
        For exec-created classes, source code must be set manually.
        """
        return cls._source_code

    @classmethod
    def model_json_schema(cls, *args: Any, **kwargs: Any) -> dict[str, Any]:
        """Generate JSON schema with verification metadata stripped.

        Overrides Pydantic's default to ensure __verification__ metadata
        (containing ground truth values) is never exposed in the schema.
        This prevents ground truth leakage to LLM judges regardless of
        which adapter or code path generates the schema.

        All args are forwarded to Pydantic's model_json_schema().
        """
        schema = super().model_json_schema(*args, **kwargs)

        def _strip_verification(obj: Any) -> None:
            if isinstance(obj, dict):
                obj.pop("__verification__", None)
                for value in obj.values():
                    _strip_verification(value)
            elif isinstance(obj, list):
                for item in obj:
                    _strip_verification(item)

        _strip_verification(schema)
        return schema

    @classmethod
    def set_source_code_from_notebook(cls) -> None:
        """Capture source code from notebook cell history (Jupyter/IPython only).

        This is a convenience method for interactive environments where
        inspect.getsource() doesn't work. It attempts to find the class
        definition in the recent cell execution history.

        Usage in notebook:
            class Answer(BaseAnswer):
                # your class definition
                pass
            Answer.set_source_code_from_notebook()
        """
        try:
            # Try to get IPython instance (works in Jupyter notebooks)
            from IPython import get_ipython  # type: ignore[attr-defined]

            ip = get_ipython()  # type: ignore[no-untyped-call]
            if ip is None:
                print("Warning: Not in an IPython/Jupyter environment")
                return

            # Get recent cell history
            history = list(ip.history_manager.get_range())

            # Look for the class definition in recent history (last 10 cells)
            class_name = cls.__name__
            for _, _, cell_content in reversed(history[-10:]):
                if f"class {class_name}(" in cell_content:
                    # Extract just the class definition part
                    lines = cell_content.strip().split("\n")
                    class_lines = []
                    in_class = False
                    base_indent = None

                    for line in lines:
                        if f"class {class_name}(" in line:
                            in_class = True
                            base_indent = len(line) - len(line.lstrip())
                            class_lines.append(line)
                        elif in_class:
                            if line.strip() == "" or (
                                base_indent is not None and len(line) - len(line.lstrip()) > base_indent
                            ):
                                class_lines.append(line)
                            else:
                                # End of class definition
                                break

                    if class_lines:
                        cls._source_code = "\n".join(class_lines)
                        print(f"✓ Source code captured for {class_name}")
                        return

            print(f"Warning: Could not find class definition for {class_name} in recent history")

        except ImportError:
            print("Warning: IPython not available. This method only works in Jupyter notebooks.")
        except Exception as e:
            print(f"Warning: Could not capture source code: {e}")

    def model_post_init(self, __context: Any) -> None:
        """Post-init hook for auto-generating ground truth from VerifiedField metadata.

        For templates using VerifiedField, this automatically populates
        self.correct with {field_name: ground_truth} if not already set.
        Classic templates and templates with custom ground_truth() are unaffected
        because the bridge in __init_subclass__ overrides this method.
        """
        if not hasattr(self, "correct") or getattr(self, "correct", None) is None:
            verified = self.__class__._get_verified_fields()
            if verified:
                self.correct = {name: meta.ground_truth for name, meta in verified.items()}

    def set_question_id(self, question_id: str) -> None:
        """Set the question ID programmatically.

        Args:
            question_id: The unique identifier for the question this answer relates to.
        """
        self.id = question_id

    @classmethod
    def _get_verified_fields(cls) -> dict[str, Any]:
        """Extract VerificationMeta from all VerifiedField-annotated fields.

        Returns:
            Mapping of field name to VerificationMeta for fields that carry
            verification metadata in json_schema_extra["__verification__"].
        """
        from karenina.schemas.entities.verified_field import VerificationMeta

        result: dict[str, VerificationMeta] = {}
        for name, field_info in cls.model_fields.items():
            extra = field_info.json_schema_extra
            if isinstance(extra, dict) and "__verification__" in extra:
                meta = VerificationMeta.model_validate(extra["__verification__"])
                result[name] = meta
        return result

    def _clear_verification_cache(self) -> None:
        """Clear the cached _field_results, forcing recomputation on next call.

        Call this after mutating field values if you need _compute_field_results()
        to reflect the updated state.
        """
        self.__dict__.pop("_field_results", None)

    def _compute_field_results(self) -> dict[str, bool]:
        """Evaluate all VerifiedField checks and cache in _field_results.

        Results are cached in self.__dict__["_field_results"] after the first
        call. Subsequent calls return the cached value without recomputation.
        Call _clear_verification_cache() to invalidate the cache after
        mutating field values.

        For TracePrimitive fields, reads self._raw_trace and compares
        check_trace() result against bool(meta.ground_truth). For parsed
        fields, calls primitive.check(extracted, ground_truth).

        Returns:
            Mapping of field name to pass/fail boolean.

        Raises:
            ValueError: If a trace field is used but _raw_trace is not set.
        """
        cached = self.__dict__.get("_field_results")
        if cached is not None:
            return cast(dict[str, bool], cached)

        from karenina.schemas.primitives import TracePrimitive
        from karenina.schemas.primitives.registry import _reconstruct_primitive

        verified = self.__class__._get_verified_fields()
        results: dict[str, bool] = {}

        for name, meta in verified.items():
            primitive = _reconstruct_primitive(meta.verify_with)

            if isinstance(primitive, TracePrimitive):
                raw_trace = getattr(self, "_raw_trace", None)
                if raw_trace is None:
                    raise ValueError(f"Field {name!r} uses a TracePrimitive but requires _raw_trace to be set")
                trace_result = primitive.check_trace(raw_trace)
                results[name] = trace_result == bool(meta.ground_truth)
            else:
                extracted = getattr(self, name)
                results[name] = primitive.check(extracted, meta.ground_truth)

        self.__dict__["_field_results"] = results
        return results

    def _auto_verify(self) -> bool:
        """Auto-generated verify() for VerifiedField templates.

        If the subclass defines a VerificationStrategy inner class with a
        verify_strategy attribute, uses evaluate_strategy() to combine field
        results. Otherwise, requires all fields to pass.

        Returns:
            True if verification passes.

        Raises:
            NotImplementedError: If the template has no VerifiedField fields
                (classic templates must define their own verify()).
        """
        from karenina.schemas.entities.composition import evaluate_strategy

        verified = self.__class__._get_verified_fields()
        if not verified:
            raise NotImplementedError("No VerifiedField fields found. Define verify() manually for classic templates.")

        field_results = self._compute_field_results()

        # Check for VerificationStrategy inner class
        strategy_cls = getattr(self.__class__, "VerificationStrategy", None)
        if strategy_cls is not None:
            strategy = getattr(strategy_cls, "verify_strategy", None)
            if strategy is not None:
                return evaluate_strategy(strategy, field_results)

        return all(field_results.values())

    def _auto_verify_granular(self) -> float:
        """Auto-generated verify_granular() for VerifiedField templates.

        For AllOf (default, no VerificationStrategy): computes a flat weighted
        average over all VerifiedField results.

        For AnyOf: returns the max passing field weight divided by total weight.
        For AtLeastN: returns the sum of the top-N passing field weights
        divided by total weight.

        Returns:
            Score between 0.0 and 1.0.

        Raises:
            NotImplementedError: If the template has no VerifiedField fields.
        """
        verified = self.__class__._get_verified_fields()
        if not verified:
            raise NotImplementedError(
                "No VerifiedField fields found. Define verify_granular() manually for classic templates."
            )

        field_results = self._compute_field_results()

        # Check for VerificationStrategy inner class (issue 133)
        strategy_cls = getattr(self.__class__, "VerificationStrategy", None)
        strategy = getattr(strategy_cls, "verify_strategy", None) if strategy_cls else None

        if strategy is not None:
            return self._composition_aware_granular(strategy, field_results, verified)

        # Default AllOf behavior: flat weighted average
        total_weight = 0.0
        weighted_sum = 0.0
        for name, meta in verified.items():
            total_weight += meta.weight
            if field_results.get(name, False):
                weighted_sum += meta.weight

        if total_weight == 0.0:
            return 0.0
        return weighted_sum / total_weight

    @staticmethod
    def _composition_aware_granular(
        strategy: Any,
        field_results: dict[str, bool],
        verified: dict[str, Any],
    ) -> float:
        """Compute granular score honoring composition strategy.

        Args:
            strategy: Composition strategy node (AllOf, AnyOf, AtLeastN).
            field_results: Per-field pass/fail booleans.
            verified: Per-field VerificationMeta (for weights).

        Returns:
            Score between 0.0 and 1.0.
        """
        from karenina.schemas.entities.composition import AnyOf, AtLeastN

        total_weight: float = sum(meta.weight for meta in verified.values())
        if total_weight == 0.0:
            return 0.0

        passing_weights: list[float] = sorted(
            [verified[name].weight for name, passed in field_results.items() if passed],
            reverse=True,
        )

        if isinstance(strategy, AnyOf):
            # AnyOf: best single passing field
            if passing_weights:
                return float(max(passing_weights)) / total_weight
            return 0.0

        if isinstance(strategy, AtLeastN):
            # AtLeastN: sum of top-N passing weights
            top_n = passing_weights[: strategy.n]
            return float(sum(top_n)) / total_weight

        # AllOf or unknown: flat weighted average
        return float(sum(passing_weights)) / total_weight

    def verify_regex(self, raw_trace: str) -> dict[str, Any]:
        """Verify regex patterns against the raw LLM response trace.

        Args:
            raw_trace: The complete raw response text from the LLM

        Returns:
            Dictionary containing regex validation results with keys:
            - 'success': bool - True if all regex patterns matched successfully
            - 'results': dict - Individual results for each regex pattern
            - 'details': dict - Detailed match information for debugging
        """
        if not hasattr(self, "regex") or not self.regex:
            return {"success": True, "results": {}, "details": {}}

        results = {}
        details = {}
        all_success = True

        for name, spec in self.regex.items():
            pattern = spec.get("pattern", "")
            expected = spec.get("expected")
            match_type = spec.get("match_type", "exact")

            try:
                result = self._verify_single_regex_pattern(raw_trace, pattern, expected, match_type)
                results[name] = result["success"]
                details[name] = result["details"]

                if not result["success"]:
                    all_success = False

            except Exception as e:
                results[name] = False
                details[name] = {"error": str(e), "pattern": pattern, "expected": expected, "match_type": match_type}
                all_success = False

        return {"success": all_success, "results": results, "details": details}

    def _verify_single_regex_pattern(self, text: str, pattern: str, expected: Any, match_type: str) -> dict[str, Any]:
        """Verify a single regex pattern against text.

        Args:
            text: Text to search in
            pattern: Regex pattern to apply
            expected: Expected result (varies by match_type)
            match_type: Type of matching - 'exact', 'contains', 'count', 'all'

        Returns:
            Dictionary with 'success' boolean and 'details' dict
        """
        matches = re.findall(pattern, text)

        details = {
            "pattern": pattern,
            "expected": expected,
            "match_type": match_type,
            "matches_found": matches,
            "match_count": len(matches),
        }

        if match_type == "exact":
            # Expected is a single string that should match exactly
            if len(matches) == 1 and matches[0] == expected:
                details["success_reason"] = "Single exact match found"
                return {"success": True, "details": details}
            else:
                details["failure_reason"] = (
                    f"Expected exactly one match of '{expected}', got {len(matches)} matches: {matches}"
                )
                return {"success": False, "details": details}

        elif match_type == "contains":
            # Expected is a string that should be found somewhere
            if expected in matches:
                details["success_reason"] = f"Expected pattern '{expected}' found in matches"
                return {"success": True, "details": details}
            else:
                details["failure_reason"] = f"Expected pattern '{expected}' not found in matches: {matches}"
                return {"success": False, "details": details}

        elif match_type == "count":
            # Expected is a number - count of matches should equal this
            if isinstance(expected, int) and len(matches) == expected:
                details["success_reason"] = f"Found exactly {expected} matches as expected"
                return {"success": True, "details": details}
            else:
                details["failure_reason"] = f"Expected {expected} matches, got {len(matches)}"
                return {"success": False, "details": details}

        elif match_type == "all":
            # Expected is a list - all items should be present in matches
            if isinstance(expected, list):
                expected_set = set(expected)
                matches_set = set(matches)
                if expected_set.issubset(matches_set):
                    details["success_reason"] = "All expected items found in matches"
                    return {"success": True, "details": details}
                else:
                    missing = expected_set - matches_set
                    details["failure_reason"] = f"Missing expected items: {list(missing)}"
                    return {"success": False, "details": details}
            else:
                details["failure_reason"] = f"Expected list for 'all' match type, got {type(expected)}"
                return {"success": False, "details": details}

        else:
            details["failure_reason"] = f"Unknown match_type: {match_type}"
            return {"success": False, "details": details}
Functions
get_source_code classmethod
get_source_code() -> str | None

Get the source code of this Answer class.

Returns:

Type Description
str | None

The source code string if available, None otherwise.

For file-based classes, source code is captured automatically. For exec-created classes, source code must be set manually.

Source code in src/karenina/schemas/entities/answer.py
@classmethod
def get_source_code(cls) -> str | None:
    """Get the source code of this Answer class.

    Returns:
        The source code string if available, None otherwise.

    For file-based classes, source code is captured automatically.
    For exec-created classes, source code must be set manually.
    """
    return cls._source_code
model_json_schema classmethod
model_json_schema(
    *args: Any, **kwargs: Any
) -> dict[str, Any]

Generate JSON schema with verification metadata stripped.

Overrides Pydantic's default to ensure verification metadata (containing ground truth values) is never exposed in the schema. This prevents ground truth leakage to LLM judges regardless of which adapter or code path generates the schema.

All args are forwarded to Pydantic's model_json_schema().

Source code in src/karenina/schemas/entities/answer.py
@classmethod
def model_json_schema(cls, *args: Any, **kwargs: Any) -> dict[str, Any]:
    """Generate JSON schema with verification metadata stripped.

    Overrides Pydantic's default to ensure __verification__ metadata
    (containing ground truth values) is never exposed in the schema.
    This prevents ground truth leakage to LLM judges regardless of
    which adapter or code path generates the schema.

    All args are forwarded to Pydantic's model_json_schema().
    """
    schema = super().model_json_schema(*args, **kwargs)

    def _strip_verification(obj: Any) -> None:
        if isinstance(obj, dict):
            obj.pop("__verification__", None)
            for value in obj.values():
                _strip_verification(value)
        elif isinstance(obj, list):
            for item in obj:
                _strip_verification(item)

    _strip_verification(schema)
    return schema
model_post_init
model_post_init(__context: Any) -> None

Post-init hook for auto-generating ground truth from VerifiedField metadata.

For templates using VerifiedField, this automatically populates self.correct with {field_name: ground_truth} if not already set. Classic templates and templates with custom ground_truth() are unaffected because the bridge in init_subclass overrides this method.

Source code in src/karenina/schemas/entities/answer.py
def model_post_init(self, __context: Any) -> None:
    """Post-init hook for auto-generating ground truth from VerifiedField metadata.

    For templates using VerifiedField, this automatically populates
    self.correct with {field_name: ground_truth} if not already set.
    Classic templates and templates with custom ground_truth() are unaffected
    because the bridge in __init_subclass__ overrides this method.
    """
    if not hasattr(self, "correct") or getattr(self, "correct", None) is None:
        verified = self.__class__._get_verified_fields()
        if verified:
            self.correct = {name: meta.ground_truth for name, meta in verified.items()}
set_question_id
set_question_id(question_id: str) -> None

Set the question ID programmatically.

Parameters:

Name Type Description Default
question_id str

The unique identifier for the question this answer relates to.

required
Source code in src/karenina/schemas/entities/answer.py
def set_question_id(self, question_id: str) -> None:
    """Set the question ID programmatically.

    Args:
        question_id: The unique identifier for the question this answer relates to.
    """
    self.id = question_id
set_source_code_from_notebook classmethod
set_source_code_from_notebook() -> None

Capture source code from notebook cell history (Jupyter/IPython only).

This is a convenience method for interactive environments where inspect.getsource() doesn't work. It attempts to find the class definition in the recent cell execution history.

Usage in notebook

class Answer(BaseAnswer): # your class definition pass Answer.set_source_code_from_notebook()

Source code in src/karenina/schemas/entities/answer.py
@classmethod
def set_source_code_from_notebook(cls) -> None:
    """Capture source code from notebook cell history (Jupyter/IPython only).

    This is a convenience method for interactive environments where
    inspect.getsource() doesn't work. It attempts to find the class
    definition in the recent cell execution history.

    Usage in notebook:
        class Answer(BaseAnswer):
            # your class definition
            pass
        Answer.set_source_code_from_notebook()
    """
    try:
        # Try to get IPython instance (works in Jupyter notebooks)
        from IPython import get_ipython  # type: ignore[attr-defined]

        ip = get_ipython()  # type: ignore[no-untyped-call]
        if ip is None:
            print("Warning: Not in an IPython/Jupyter environment")
            return

        # Get recent cell history
        history = list(ip.history_manager.get_range())

        # Look for the class definition in recent history (last 10 cells)
        class_name = cls.__name__
        for _, _, cell_content in reversed(history[-10:]):
            if f"class {class_name}(" in cell_content:
                # Extract just the class definition part
                lines = cell_content.strip().split("\n")
                class_lines = []
                in_class = False
                base_indent = None

                for line in lines:
                    if f"class {class_name}(" in line:
                        in_class = True
                        base_indent = len(line) - len(line.lstrip())
                        class_lines.append(line)
                    elif in_class:
                        if line.strip() == "" or (
                            base_indent is not None and len(line) - len(line.lstrip()) > base_indent
                        ):
                            class_lines.append(line)
                        else:
                            # End of class definition
                            break

                if class_lines:
                    cls._source_code = "\n".join(class_lines)
                    print(f"✓ Source code captured for {class_name}")
                    return

        print(f"Warning: Could not find class definition for {class_name} in recent history")

    except ImportError:
        print("Warning: IPython not available. This method only works in Jupyter notebooks.")
    except Exception as e:
        print(f"Warning: Could not capture source code: {e}")
verify_regex
verify_regex(raw_trace: str) -> dict[str, Any]

Verify regex patterns against the raw LLM response trace.

Parameters:

Name Type Description Default
raw_trace str

The complete raw response text from the LLM

required

Returns:

Type Description
dict[str, Any]

Dictionary containing regex validation results with keys:

dict[str, Any]
  • 'success': bool - True if all regex patterns matched successfully
dict[str, Any]
  • 'results': dict - Individual results for each regex pattern
dict[str, Any]
  • 'details': dict - Detailed match information for debugging
Source code in src/karenina/schemas/entities/answer.py
def verify_regex(self, raw_trace: str) -> dict[str, Any]:
    """Verify regex patterns against the raw LLM response trace.

    Args:
        raw_trace: The complete raw response text from the LLM

    Returns:
        Dictionary containing regex validation results with keys:
        - 'success': bool - True if all regex patterns matched successfully
        - 'results': dict - Individual results for each regex pattern
        - 'details': dict - Detailed match information for debugging
    """
    if not hasattr(self, "regex") or not self.regex:
        return {"success": True, "results": {}, "details": {}}

    results = {}
    details = {}
    all_success = True

    for name, spec in self.regex.items():
        pattern = spec.get("pattern", "")
        expected = spec.get("expected")
        match_type = spec.get("match_type", "exact")

        try:
            result = self._verify_single_regex_pattern(raw_trace, pattern, expected, match_type)
            results[name] = result["success"]
            details[name] = result["details"]

            if not result["success"]:
                all_success = False

        except Exception as e:
            results[name] = False
            details[name] = {"error": str(e), "pattern": pattern, "expected": expected, "match_type": match_type}
            all_success = False

    return {"success": all_success, "results": results, "details": details}

BooleanMatch

Bases: VerificationPrimitive

Compare extracted bool to ground truth bool.

Both values are coerced to bool before comparison.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class BooleanMatch(VerificationPrimitive):
    """Compare extracted bool to ground truth bool.

    Both values are coerced to bool before comparison.
    """

    def check(self, extracted: Any, expected: Any) -> bool:
        return bool(extracted) == bool(expected)

CallableRubricTrait

Bases: BaseModel

Callable-based evaluation trait using custom Python functions.

This trait type serializes and stores custom Python functions using cloudpickle, enabling complex, stateful, or domain-specific validation logic that cannot be expressed as simple regex patterns.

SECURITY WARNING: Deserializing callable code can execute arbitrary Python code. Only load CallableRubricTrait instances from trusted sources. CallableRubricTrait cannot be created via the web API for security reasons.

The trait can return either boolean (pass/fail) or numeric score results, matching LLMRubricTrait behavior.

Examples:

Boolean: - Word count validation: lambda text: len(text.split()) >= 50 - Custom domain logic: checking medical terminology consistency

Score: - Readability score: lambda text: calculate_flesch_kincaid(text) - Custom metric: lambda text: compute_domain_score(text)

Source code in src/karenina/schemas/entities/rubric.py
class CallableRubricTrait(BaseModel):
    """
    Callable-based evaluation trait using custom Python functions.

    This trait type serializes and stores custom Python functions using cloudpickle,
    enabling complex, stateful, or domain-specific validation logic that cannot be
    expressed as simple regex patterns.

    **SECURITY WARNING**: Deserializing callable code can execute arbitrary Python code.
    Only load CallableRubricTrait instances from trusted sources. CallableRubricTrait cannot be
    created via the web API for security reasons.

    The trait can return either boolean (pass/fail) or numeric score results, matching
    LLMRubricTrait behavior.

    Examples:
        Boolean:
        - Word count validation: lambda text: len(text.split()) >= 50
        - Custom domain logic: checking medical terminology consistency

        Score:
        - Readability score: lambda text: calculate_flesch_kincaid(text)
        - Custom metric: lambda text: compute_domain_score(text)
    """

    name: str = Field(..., min_length=1, description="Human readable identifier for the trait")
    description: str | None = Field(None, description="Detailed description of what this trait evaluates")
    summary: str | None = Field(None, description="Short concept label for dynamic rubric presence check")
    kind: TraitKind = Field(..., description="Type of evaluation: 'boolean' for pass/fail, 'score' for numeric")
    callable_code: bytes = Field(..., description="Serialized callable function (cloudpickle)")
    min_score: int | None = Field(None, description="Minimum score value (required if kind='score')")
    max_score: int | None = Field(None, description="Maximum score value (required if kind='score')")
    invert_result: bool = Field(False, description="Whether to invert the boolean result (only for kind='boolean')")

    # Directionality field
    higher_is_better: bool = Field(
        ...,
        description="Whether higher return values indicate better performance. "
        "True: high value = good. False: high value = bad.",
    )

    model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)

    @model_validator(mode="before")
    @classmethod
    def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Set default for higher_is_better when loading legacy data."""
        if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
            values["higher_is_better"] = True
        return values

    @field_serializer("callable_code")
    def serialize_callable_code(self, value: bytes, _info: Any) -> str:
        """Serialize callable_code bytes to base64 string for JSON export."""
        return base64.b64encode(value).decode("ascii")

    @field_validator("callable_code", mode="before")
    @classmethod
    def validate_callable_code(cls, value: bytes | str) -> bytes:
        """Convert base64 string to bytes if needed."""
        if isinstance(value, bytes):
            return value
        if isinstance(value, str):
            return base64.b64decode(value)
        raise ValueError(f"callable_code must be bytes or base64 string, got {type(value)}")

    @classmethod
    def from_callable(
        cls,
        name: str,
        func: Callable[[str], bool | int],
        kind: TraitKind,
        description: str | None = None,
        summary: str | None = None,
        min_score: int | None = None,
        max_score: int | None = None,
        invert_result: bool = False,
        higher_is_better: bool = True,
    ) -> "CallableRubricTrait":
        """
        Create a CallableRubricTrait from a callable function.

        Args:
            name: Trait name
            func: Function that takes a string (the verification trace/answer text) and returns bool or int
            kind: Type of evaluation - 'boolean' or 'score'
            description: Optional trait description
            summary: Short concept label for dynamic rubric presence check
            min_score: Minimum score (required if kind='score')
            max_score: Maximum score (required if kind='score')
            invert_result: Whether to invert boolean result (only for kind='boolean')
            higher_is_better: Whether higher return values indicate better performance

        Returns:
            CallableRubricTrait instance with serialized function

        Raises:
            ValueError: If function signature is invalid or score parameters are missing
        """
        # Validate function signature
        import inspect

        sig = inspect.signature(func)
        params = list(sig.parameters.keys())

        if len(params) != 1:
            raise ValueError(f"Callable must have exactly one parameter, got {len(params)}")

        # Validate score parameters
        if kind == "score":
            if min_score is None or max_score is None:
                raise ValueError("min_score and max_score are required when kind='score'")
            if min_score >= max_score:
                raise ValueError(f"min_score ({min_score}) must be less than max_score ({max_score})")
        else:  # kind == "boolean"
            if min_score is not None or max_score is not None:
                raise ValueError("min_score and max_score should not be set when kind='boolean'")

        # Serialize the function
        callable_code = cloudpickle.dumps(func)

        return cls(
            name=name,
            description=description,
            summary=summary,
            kind=kind,
            callable_code=callable_code,
            min_score=min_score,
            max_score=max_score,
            invert_result=invert_result,
            higher_is_better=higher_is_better,
        )

    def deserialize_callable(self) -> Callable[[str], bool | int]:
        """
        Deserialize the callable function from stored bytes.

        **SECURITY WARNING**: This executes code that was serialized and may contain
        arbitrary Python code. Only deserialize callables from trusted sources.

        Returns:
            The deserialized callable function

        Raises:
            RuntimeError: If deserialization fails
        """
        try:
            warnings.warn(
                f"Deserializing callable for trait '{self.name}'. "
                "This executes stored code. Only load from trusted sources.",
                category=UserWarning,
                stacklevel=2,
            )
            callable_func: Callable[[str], bool | int] = cloudpickle.loads(self.callable_code)
            return callable_func
        except Exception as e:
            raise RuntimeError(f"Failed to deserialize callable for trait '{self.name}': {e}") from e

    def evaluate(self, text: str) -> bool | int:
        """
        Evaluate the trait against the provided text.

        Args:
            text: The text to evaluate (verification trace or answer text)

        Returns:
            Boolean result for kind='boolean', numeric score for kind='score'

        Raises:
            RuntimeError: If evaluation fails
            ValueError: If return type doesn't match kind or score is out of range
        """
        try:
            func = self.deserialize_callable()
            result = func(text)

            if self.kind == "boolean":
                if not isinstance(result, bool):
                    raise ValueError(f"Callable with kind='boolean' must return bool, got {type(result)}")
                return not result if self.invert_result else result
            else:  # kind == "score"
                if not isinstance(result, int | float):
                    raise ValueError(f"Callable with kind='score' must return int or float, got {type(result)}")

                # Convert to int if float
                score = int(result) if isinstance(result, float) else result

                # Validate score range
                if self.min_score is not None and score < self.min_score:
                    raise ValueError(f"Score {score} is below minimum {self.min_score} for trait '{self.name}'")
                if self.max_score is not None and score > self.max_score:
                    raise ValueError(f"Score {score} is above maximum {self.max_score} for trait '{self.name}'")

                return score
        except Exception as e:
            raise RuntimeError(f"Failed to evaluate callable trait '{self.name}': {e}") from e
Functions
deserialize_callable
deserialize_callable() -> Callable[[str], bool | int]

Deserialize the callable function from stored bytes.

SECURITY WARNING: This executes code that was serialized and may contain arbitrary Python code. Only deserialize callables from trusted sources.

Returns:

Type Description
Callable[[str], bool | int]

The deserialized callable function

Raises:

Type Description
RuntimeError

If deserialization fails

Source code in src/karenina/schemas/entities/rubric.py
def deserialize_callable(self) -> Callable[[str], bool | int]:
    """
    Deserialize the callable function from stored bytes.

    **SECURITY WARNING**: This executes code that was serialized and may contain
    arbitrary Python code. Only deserialize callables from trusted sources.

    Returns:
        The deserialized callable function

    Raises:
        RuntimeError: If deserialization fails
    """
    try:
        warnings.warn(
            f"Deserializing callable for trait '{self.name}'. "
            "This executes stored code. Only load from trusted sources.",
            category=UserWarning,
            stacklevel=2,
        )
        callable_func: Callable[[str], bool | int] = cloudpickle.loads(self.callable_code)
        return callable_func
    except Exception as e:
        raise RuntimeError(f"Failed to deserialize callable for trait '{self.name}': {e}") from e
evaluate
evaluate(text: str) -> bool | int

Evaluate the trait against the provided text.

Parameters:

Name Type Description Default
text str

The text to evaluate (verification trace or answer text)

required

Returns:

Type Description
bool | int

Boolean result for kind='boolean', numeric score for kind='score'

Raises:

Type Description
RuntimeError

If evaluation fails

ValueError

If return type doesn't match kind or score is out of range

Source code in src/karenina/schemas/entities/rubric.py
def evaluate(self, text: str) -> bool | int:
    """
    Evaluate the trait against the provided text.

    Args:
        text: The text to evaluate (verification trace or answer text)

    Returns:
        Boolean result for kind='boolean', numeric score for kind='score'

    Raises:
        RuntimeError: If evaluation fails
        ValueError: If return type doesn't match kind or score is out of range
    """
    try:
        func = self.deserialize_callable()
        result = func(text)

        if self.kind == "boolean":
            if not isinstance(result, bool):
                raise ValueError(f"Callable with kind='boolean' must return bool, got {type(result)}")
            return not result if self.invert_result else result
        else:  # kind == "score"
            if not isinstance(result, int | float):
                raise ValueError(f"Callable with kind='score' must return int or float, got {type(result)}")

            # Convert to int if float
            score = int(result) if isinstance(result, float) else result

            # Validate score range
            if self.min_score is not None and score < self.min_score:
                raise ValueError(f"Score {score} is below minimum {self.min_score} for trait '{self.name}'")
            if self.max_score is not None and score > self.max_score:
                raise ValueError(f"Score {score} is above maximum {self.max_score} for trait '{self.name}'")

            return score
    except Exception as e:
        raise RuntimeError(f"Failed to evaluate callable trait '{self.name}': {e}") from e
from_callable classmethod
from_callable(
    name: str,
    func: Callable[[str], bool | int],
    kind: TraitKind,
    description: str | None = None,
    summary: str | None = None,
    min_score: int | None = None,
    max_score: int | None = None,
    invert_result: bool = False,
    higher_is_better: bool = True,
) -> CallableRubricTrait

Create a CallableRubricTrait from a callable function.

Parameters:

Name Type Description Default
name str

Trait name

required
func Callable[[str], bool | int]

Function that takes a string (the verification trace/answer text) and returns bool or int

required
kind TraitKind

Type of evaluation - 'boolean' or 'score'

required
description str | None

Optional trait description

None
summary str | None

Short concept label for dynamic rubric presence check

None
min_score int | None

Minimum score (required if kind='score')

None
max_score int | None

Maximum score (required if kind='score')

None
invert_result bool

Whether to invert boolean result (only for kind='boolean')

False
higher_is_better bool

Whether higher return values indicate better performance

True

Returns:

Type Description
CallableRubricTrait

CallableRubricTrait instance with serialized function

Raises:

Type Description
ValueError

If function signature is invalid or score parameters are missing

Source code in src/karenina/schemas/entities/rubric.py
@classmethod
def from_callable(
    cls,
    name: str,
    func: Callable[[str], bool | int],
    kind: TraitKind,
    description: str | None = None,
    summary: str | None = None,
    min_score: int | None = None,
    max_score: int | None = None,
    invert_result: bool = False,
    higher_is_better: bool = True,
) -> "CallableRubricTrait":
    """
    Create a CallableRubricTrait from a callable function.

    Args:
        name: Trait name
        func: Function that takes a string (the verification trace/answer text) and returns bool or int
        kind: Type of evaluation - 'boolean' or 'score'
        description: Optional trait description
        summary: Short concept label for dynamic rubric presence check
        min_score: Minimum score (required if kind='score')
        max_score: Maximum score (required if kind='score')
        invert_result: Whether to invert boolean result (only for kind='boolean')
        higher_is_better: Whether higher return values indicate better performance

    Returns:
        CallableRubricTrait instance with serialized function

    Raises:
        ValueError: If function signature is invalid or score parameters are missing
    """
    # Validate function signature
    import inspect

    sig = inspect.signature(func)
    params = list(sig.parameters.keys())

    if len(params) != 1:
        raise ValueError(f"Callable must have exactly one parameter, got {len(params)}")

    # Validate score parameters
    if kind == "score":
        if min_score is None or max_score is None:
            raise ValueError("min_score and max_score are required when kind='score'")
        if min_score >= max_score:
            raise ValueError(f"min_score ({min_score}) must be less than max_score ({max_score})")
    else:  # kind == "boolean"
        if min_score is not None or max_score is not None:
            raise ValueError("min_score and max_score should not be set when kind='boolean'")

    # Serialize the function
    callable_code = cloudpickle.dumps(func)

    return cls(
        name=name,
        description=description,
        summary=summary,
        kind=kind,
        callable_code=callable_code,
        min_score=min_score,
        max_score=max_score,
        invert_result=invert_result,
        higher_is_better=higher_is_better,
    )
serialize_callable_code
serialize_callable_code(value: bytes, _info: Any) -> str

Serialize callable_code bytes to base64 string for JSON export.

Source code in src/karenina/schemas/entities/rubric.py
@field_serializer("callable_code")
def serialize_callable_code(self, value: bytes, _info: Any) -> str:
    """Serialize callable_code bytes to base64 string for JSON export."""
    return base64.b64encode(value).decode("ascii")
set_legacy_defaults classmethod
set_legacy_defaults(
    values: dict[str, Any],
) -> dict[str, Any]

Set default for higher_is_better when loading legacy data.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="before")
@classmethod
def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Set default for higher_is_better when loading legacy data."""
    if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
        values["higher_is_better"] = True
    return values
validate_callable_code classmethod
validate_callable_code(value: bytes | str) -> bytes

Convert base64 string to bytes if needed.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("callable_code", mode="before")
@classmethod
def validate_callable_code(cls, value: bytes | str) -> bytes:
    """Convert base64 string to bytes if needed."""
    if isinstance(value, bytes):
        return value
    if isinstance(value, str):
        return base64.b64decode(value)
    raise ValueError(f"callable_code must be bytes or base64 string, got {type(value)}")

ContainsAll

Bases: VerificationPrimitive

Check that extracted text contains all of the given substrings.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class ContainsAll(VerificationPrimitive):
    """Check that extracted text contains all of the given substrings."""

    substrings: list[str]
    normalize: list[Normalizer] = []

    def check(self, extracted: Any, _expected: Any) -> bool:
        text = apply_normalizers(self.normalize, str(extracted))
        normalized_subs = [apply_normalizers(self.normalize, s) for s in self.substrings]
        return all(sub in text for sub in normalized_subs)

ContainsAny

Bases: VerificationPrimitive

Check that extracted text contains at least one of the given substrings.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class ContainsAny(VerificationPrimitive):
    """Check that extracted text contains at least one of the given substrings."""

    substrings: list[str]
    normalize: list[Normalizer] = []

    def check(self, extracted: Any, _expected: Any) -> bool:
        text = apply_normalizers(self.normalize, str(extracted))
        normalized_subs = [apply_normalizers(self.normalize, s) for s in self.substrings]
        return any(sub in text for sub in normalized_subs)

DateMatch

Bases: VerificationPrimitive

Parse and compare dates (format-flexible).

Uses python-dateutil for flexible parsing when no format is specified.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class DateMatch(VerificationPrimitive):
    """Parse and compare dates (format-flexible).

    Uses python-dateutil for flexible parsing when no format is specified.
    """

    format: str | None = None

    def check(self, extracted: Any, expected: Any) -> bool:
        try:
            e_date = _parse_date(str(extracted), self.format)
            x_date = _parse_date(str(expected), self.format)
            return e_date.date() == x_date.date()
        except (ValueError, TypeError):
            logger.warning("Date parsing failed for %r vs %r", extracted, expected)
            return False

DateRange

Bases: VerificationPrimitive

Check that extracted date falls within a range.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class DateRange(VerificationPrimitive):
    """Check that extracted date falls within a range."""

    min: str | None = None
    max: str | None = None

    def check(self, extracted: Any, _expected: Any) -> bool:
        try:
            e_date = _parse_date(str(extracted))
            if self.min is not None and e_date < _parse_date(self.min):
                return False
            return not (self.max is not None and e_date > _parse_date(self.max))
        except (ValueError, TypeError):
            logger.warning("Date parsing failed for %r", extracted)
            return False

DateTolerance

Bases: VerificationPrimitive

Check that extracted date is within tolerance of expected date.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class DateTolerance(VerificationPrimitive):
    """Check that extracted date is within tolerance of expected date."""

    tolerance: int
    unit: Literal["days", "hours", "minutes"] = "days"

    def check(self, extracted: Any, expected: Any) -> bool:
        try:
            e_date = _parse_date(str(extracted))
            x_date = _parse_date(str(expected))
            if self.unit == "days":
                delta = timedelta(days=self.tolerance)
            elif self.unit == "hours":
                delta = timedelta(hours=self.tolerance)
            elif self.unit == "minutes":
                delta = timedelta(minutes=self.tolerance)
            else:
                raise ValueError(f"Unknown unit: {self.unit!r}")
            return abs(e_date - x_date) <= delta
        except (ValueError, TypeError):
            logger.warning("Date parsing failed for %r vs %r", extracted, expected)
            return False

DynamicRubric

Bases: BaseModel

Rubric whose traits are conditionally evaluated based on concept presence.

Unlike a regular Rubric (evaluated unconditionally), a DynamicRubric gates each trait on whether its concept is detected in the response. Every trait must carry either a summary or description so that the presence check prompt can describe the concept to the judge LLM.

Source code in src/karenina/schemas/entities/rubric.py
class DynamicRubric(BaseModel):
    """Rubric whose traits are conditionally evaluated based on concept presence.

    Unlike a regular Rubric (evaluated unconditionally), a DynamicRubric gates
    each trait on whether its concept is detected in the response. Every trait
    must carry either a ``summary`` or ``description`` so that the presence
    check prompt can describe the concept to the judge LLM.
    """

    model_config = ConfigDict(extra="forbid")

    llm_traits: list[LLMRubricTrait] = Field(default_factory=list)
    regex_traits: list[RegexRubricTrait] = Field(default_factory=list)
    callable_traits: list[CallableRubricTrait] = Field(default_factory=list)
    metric_traits: list[MetricRubricTrait] = Field(default_factory=list)
    agentic_traits: list[AgenticRubricTrait] = Field(default_factory=list)

    @model_validator(mode="after")
    def validate_trait_names(self) -> "DynamicRubric":
        """Reject duplicate trait names within and across types.

        Mirrors ``Rubric.validate_trait_names``. Both same-type and
        cross-type duplicates are rejected.
        """
        type_lists: list[tuple[str, list[Any]]] = [
            ("llm", self.llm_traits),
            ("regex", self.regex_traits),
            ("callable", self.callable_traits),
            ("metric", self.metric_traits),
            ("agentic", self.agentic_traits),
        ]
        for type_label, traits in type_lists:
            seen: set[str] = set()
            for trait in traits:
                if trait.name in seen:
                    raise ValueError(
                        f"Duplicate {type_label} trait name '{trait.name}' "
                        f"within the same dynamic rubric. Trait names must be "
                        f"unique per type."
                    )
                seen.add(trait.name)

        # Cross-type uniqueness check
        all_names = self.get_trait_names()
        seen_all: set[str] = set()
        for name in all_names:
            if name in seen_all:
                raise ValueError(
                    f"Duplicate trait name '{name}' across different trait types. "
                    f"Trait names must be unique across all types within a dynamic rubric."
                )
            seen_all.add(name)
        return self

    @model_validator(mode="after")
    def validate_concept_text(self) -> "DynamicRubric":
        """Ensure every trait has text usable for concept presence checking.

        Each trait must have at least one of ``summary`` or ``description``.
        If ``summary`` is None but ``description`` exists, a warning is logged
        because ``summary`` is the preferred short label for the presence check
        prompt. If both are None, the trait cannot participate in presence
        checking and a ``ValueError`` is raised.
        """
        for trait in self._all_traits():
            has_summary = getattr(trait, "summary", None) is not None
            has_description = getattr(trait, "description", None) is not None

            if not has_summary and not has_description:
                raise ValueError(
                    f"Dynamic rubric trait '{trait.name}' has neither summary nor description. "
                    "At least one is required for concept presence checking."
                )
            if not has_summary and has_description:
                logger.warning(
                    "Dynamic rubric trait '%s' has no summary; falling back to description "
                    "for concept presence text. Consider adding a short summary.",
                    trait.name,
                )
        return self

    def _all_traits(self) -> list[_AnyTrait]:
        """Return a flat list of all traits across every type."""
        result: list[_AnyTrait] = []
        result.extend(self.llm_traits)
        result.extend(self.regex_traits)
        result.extend(self.callable_traits)
        result.extend(self.metric_traits)
        result.extend(self.agentic_traits)
        return result

    def get_trait_names(self) -> list[str]:
        """Return names of all traits in type order: llm, regex, callable, metric, agentic."""
        return [trait.name for trait in self._all_traits()]

    def is_empty(self) -> bool:
        """Return True if this dynamic rubric contains no traits."""
        return len(self._all_traits()) == 0

    def resolve_concept_text(self, trait: _AnyTrait) -> str:
        """Return the text to use for concept presence checking.

        Prefers ``summary`` when set; falls back to ``description``.

        Args:
            trait: A trait instance from this dynamic rubric.

        Returns:
            The concept text string (summary or description).
        """
        summary = getattr(trait, "summary", None)
        if summary is not None:
            return str(summary)
        description = getattr(trait, "description", None)
        if description is not None:
            return str(description)
        # Should not happen if validation passed, but guard defensively
        return trait.name
Functions
get_trait_names
get_trait_names() -> list[str]

Return names of all traits in type order: llm, regex, callable, metric, agentic.

Source code in src/karenina/schemas/entities/rubric.py
def get_trait_names(self) -> list[str]:
    """Return names of all traits in type order: llm, regex, callable, metric, agentic."""
    return [trait.name for trait in self._all_traits()]
is_empty
is_empty() -> bool

Return True if this dynamic rubric contains no traits.

Source code in src/karenina/schemas/entities/rubric.py
def is_empty(self) -> bool:
    """Return True if this dynamic rubric contains no traits."""
    return len(self._all_traits()) == 0
resolve_concept_text
resolve_concept_text(trait: _AnyTrait) -> str

Return the text to use for concept presence checking.

Prefers summary when set; falls back to description.

Parameters:

Name Type Description Default
trait _AnyTrait

A trait instance from this dynamic rubric.

required

Returns:

Type Description
str

The concept text string (summary or description).

Source code in src/karenina/schemas/entities/rubric.py
def resolve_concept_text(self, trait: _AnyTrait) -> str:
    """Return the text to use for concept presence checking.

    Prefers ``summary`` when set; falls back to ``description``.

    Args:
        trait: A trait instance from this dynamic rubric.

    Returns:
        The concept text string (summary or description).
    """
    summary = getattr(trait, "summary", None)
    if summary is not None:
        return str(summary)
    description = getattr(trait, "description", None)
    if description is not None:
        return str(description)
    # Should not happen if validation passed, but guard defensively
    return trait.name
validate_concept_text
validate_concept_text() -> DynamicRubric

Ensure every trait has text usable for concept presence checking.

Each trait must have at least one of summary or description. If summary is None but description exists, a warning is logged because summary is the preferred short label for the presence check prompt. If both are None, the trait cannot participate in presence checking and a ValueError is raised.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_concept_text(self) -> "DynamicRubric":
    """Ensure every trait has text usable for concept presence checking.

    Each trait must have at least one of ``summary`` or ``description``.
    If ``summary`` is None but ``description`` exists, a warning is logged
    because ``summary`` is the preferred short label for the presence check
    prompt. If both are None, the trait cannot participate in presence
    checking and a ``ValueError`` is raised.
    """
    for trait in self._all_traits():
        has_summary = getattr(trait, "summary", None) is not None
        has_description = getattr(trait, "description", None) is not None

        if not has_summary and not has_description:
            raise ValueError(
                f"Dynamic rubric trait '{trait.name}' has neither summary nor description. "
                "At least one is required for concept presence checking."
            )
        if not has_summary and has_description:
            logger.warning(
                "Dynamic rubric trait '%s' has no summary; falling back to description "
                "for concept presence text. Consider adding a short summary.",
                trait.name,
            )
    return self
validate_trait_names
validate_trait_names() -> DynamicRubric

Reject duplicate trait names within and across types.

Mirrors Rubric.validate_trait_names. Both same-type and cross-type duplicates are rejected.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_trait_names(self) -> "DynamicRubric":
    """Reject duplicate trait names within and across types.

    Mirrors ``Rubric.validate_trait_names``. Both same-type and
    cross-type duplicates are rejected.
    """
    type_lists: list[tuple[str, list[Any]]] = [
        ("llm", self.llm_traits),
        ("regex", self.regex_traits),
        ("callable", self.callable_traits),
        ("metric", self.metric_traits),
        ("agentic", self.agentic_traits),
    ]
    for type_label, traits in type_lists:
        seen: set[str] = set()
        for trait in traits:
            if trait.name in seen:
                raise ValueError(
                    f"Duplicate {type_label} trait name '{trait.name}' "
                    f"within the same dynamic rubric. Trait names must be "
                    f"unique per type."
                )
            seen.add(trait.name)

    # Cross-type uniqueness check
    all_names = self.get_trait_names()
    seen_all: set[str] = set()
    for name in all_names:
        if name in seen_all:
            raise ValueError(
                f"Duplicate trait name '{name}' across different trait types. "
                f"Trait names must be unique across all types within a dynamic rubric."
            )
        seen_all.add(name)
    return self

ExactMatch

Bases: VerificationPrimitive

Normalize then compare strings for equality.

Default normalization: lowercase + strip whitespace.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class ExactMatch(VerificationPrimitive):
    """Normalize then compare strings for equality.

    Default normalization: lowercase + strip whitespace.
    """

    normalize: list[Normalizer] = ["lowercase", "strip"]

    def check(self, extracted: Any, expected: Any) -> bool:
        e, x = str(extracted), str(expected)
        e = apply_normalizers(self.normalize, e)
        x = apply_normalizers(self.normalize, x)
        return e == x

FieldCheck

Bases: BaseModel

Leaf node: references a single field's pass/fail result.

Source code in src/karenina/schemas/entities/composition.py
class FieldCheck(BaseModel):
    """Leaf node: references a single field's pass/fail result."""

    type: Literal["field_check"] = "field_check"
    field: str

LLMRubricTrait

Bases: BaseModel

LLM-evaluated trait for qualitative assessment.

A trait can be: - boolean (true/false): Binary pass/fail assessment - score (1-5 scale): Numeric rating within a range - literal (categorical): Classification into predefined classes

For kind="literal": - The classes field is REQUIRED - min_score is automatically set to 0 (first class index) - max_score is automatically set to len(classes)-1 (last class index) - Returns int index (0, 1, 2...) based on class order - higher_is_better controls ordering interpretation

Deep Judgment Mode (optional): When enabled, provides evidence-based evaluation with: - Optional excerpt extraction from answer text - Retry mechanism with validation feedback - Reasoning generation explaining the score - Optional search-enhanced hallucination detection

Source code in src/karenina/schemas/entities/rubric.py
class LLMRubricTrait(BaseModel):
    """
    LLM-evaluated trait for qualitative assessment.

    A trait can be:
    - boolean (true/false): Binary pass/fail assessment
    - score (1-5 scale): Numeric rating within a range
    - literal (categorical): Classification into predefined classes

    For kind="literal":
    - The `classes` field is REQUIRED
    - `min_score` is automatically set to 0 (first class index)
    - `max_score` is automatically set to len(classes)-1 (last class index)
    - Returns int index (0, 1, 2...) based on class order
    - `higher_is_better` controls ordering interpretation

    Deep Judgment Mode (optional):
        When enabled, provides evidence-based evaluation with:
        - Optional excerpt extraction from answer text
        - Retry mechanism with validation feedback
        - Reasoning generation explaining the score
        - Optional search-enhanced hallucination detection
    """

    name: str = Field(..., min_length=1, description="Human readable identifier for the trait")
    description: str | None = Field(None, description="Detailed description shown to user/LLM")
    summary: str | None = Field(None, description="Short concept label for dynamic rubric presence check")
    kind: TraitKind = Field(..., description="Type of trait: 'boolean', 'score', or 'literal'")
    min_score: int | None = Field(1, description="Lower bound for score traits (default: 1). Auto-derived for literal.")
    max_score: int | None = Field(5, description="Upper bound for score traits (default: 5). Auto-derived for literal.")

    # Literal-specific field (required when kind="literal")
    classes: dict[str, str] | None = Field(
        None,
        description="Class name → description mapping. Required when kind='literal'. "
        "Order determines indices (0, 1, 2...). Must have 2-20 classes.",
    )

    # Deep Judgment fields
    deep_judgment_enabled: bool = Field(
        False,
        description="Enable deep judgment evaluation for this trait (multi-stage with reasoning)",
    )
    deep_judgment_excerpt_enabled: bool = Field(
        True,
        description="Extract verbatim excerpts from answer as evidence (only if deep_judgment_enabled=True)",
    )
    deep_judgment_max_excerpts: int | None = Field(
        None,
        description="Maximum number of excerpts to extract (overrides global default if set)",
    )
    deep_judgment_fuzzy_match_threshold: float | None = Field(
        None,
        description="Fuzzy matching threshold for excerpt validation 0.0-1.0 (overrides global default if set)",
    )
    deep_judgment_excerpt_retry_attempts: int | None = Field(
        None,
        description="Number of retry attempts for excerpt extraction (overrides global default if set)",
    )
    deep_judgment_search_enabled: bool = Field(
        False,
        description="Enable search-enhanced hallucination detection for excerpts (only if excerpt_enabled=True)",
    )

    # Directionality field
    higher_is_better: bool = Field(
        ...,
        description="Whether higher values indicate better performance. "
        "For boolean: True means True is good. "
        "For score: True means higher scores are better. "
        "For literal: True means higher indices (later classes) are better.",
    )

    model_config = ConfigDict(extra="forbid")

    @field_validator("classes")
    @classmethod
    def validate_classes(cls, v: dict[str, str] | None) -> dict[str, str] | None:
        """Validate class definitions when present."""
        if v is None:
            return None
        if len(v) < 2:
            raise ValueError("Literal trait must have at least 2 classes")
        if len(v) > 20:
            raise ValueError("Literal trait cannot have more than 20 classes")

        seen_names: set[str] = set()
        for class_name, class_desc in v.items():
            if not class_name.strip():
                raise ValueError("Class names cannot be empty")
            if not class_desc.strip():
                raise ValueError(f"Description for class '{class_name}' cannot be empty")
            lower_name = class_name.lower()
            if lower_name in seen_names:
                raise ValueError(f"Duplicate class name (case-insensitive): '{class_name}'")
            seen_names.add(lower_name)
        return v

    @model_validator(mode="before")
    @classmethod
    def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Set default for higher_is_better when loading legacy data."""
        if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
            values["higher_is_better"] = True
        return values

    @model_validator(mode="after")
    def validate_kind_fields(self) -> "LLMRubricTrait":
        """Validate and set kind-specific fields."""
        if self.kind == "literal":
            if self.classes is None:
                raise ValueError("classes field is required when kind='literal'")
            # Automatically derive min_score and max_score from classes
            object.__setattr__(self, "min_score", 0)
            object.__setattr__(self, "max_score", len(self.classes) - 1)
        return self

    def get_class_names(self) -> list[str]:
        """Get list of valid class names (preserves dict order). Only for kind='literal'."""
        if self.kind != "literal" or self.classes is None:
            return []
        return list(self.classes.keys())

    def get_class_index(self, class_name: str) -> int:
        """Get numeric index for a class name. Returns -1 if invalid. Only for kind='literal'."""
        class_names = self.get_class_names()
        try:
            return class_names.index(class_name)
        except ValueError:
            return -1

    def validate_score(self, value: int | bool) -> bool:
        """Validate that a given score is valid for this trait."""
        if self.kind == "boolean":
            return isinstance(value, bool)
        else:  # kind == "score" or kind == "literal"
            # Both use min_score/max_score (literal derives them from classes)
            # Reject boolean values explicitly (bool is a subclass of int in Python)
            if isinstance(value, bool):
                return False
            if not isinstance(value, int):
                return False
            # Use explicit None checks to allow min_score=0
            min_val = self.min_score if self.min_score is not None else 0
            max_val = self.max_score if self.max_score is not None else 5
            # For literal, also allow -1 as error state
            if self.kind == "literal" and value == -1:
                return True
            return min_val <= value <= max_val
Functions
get_class_index
get_class_index(class_name: str) -> int

Get numeric index for a class name. Returns -1 if invalid. Only for kind='literal'.

Source code in src/karenina/schemas/entities/rubric.py
def get_class_index(self, class_name: str) -> int:
    """Get numeric index for a class name. Returns -1 if invalid. Only for kind='literal'."""
    class_names = self.get_class_names()
    try:
        return class_names.index(class_name)
    except ValueError:
        return -1
get_class_names
get_class_names() -> list[str]

Get list of valid class names (preserves dict order). Only for kind='literal'.

Source code in src/karenina/schemas/entities/rubric.py
def get_class_names(self) -> list[str]:
    """Get list of valid class names (preserves dict order). Only for kind='literal'."""
    if self.kind != "literal" or self.classes is None:
        return []
    return list(self.classes.keys())
set_legacy_defaults classmethod
set_legacy_defaults(
    values: dict[str, Any],
) -> dict[str, Any]

Set default for higher_is_better when loading legacy data.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="before")
@classmethod
def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Set default for higher_is_better when loading legacy data."""
    if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
        values["higher_is_better"] = True
    return values
validate_classes classmethod
validate_classes(
    v: dict[str, str] | None,
) -> dict[str, str] | None

Validate class definitions when present.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("classes")
@classmethod
def validate_classes(cls, v: dict[str, str] | None) -> dict[str, str] | None:
    """Validate class definitions when present."""
    if v is None:
        return None
    if len(v) < 2:
        raise ValueError("Literal trait must have at least 2 classes")
    if len(v) > 20:
        raise ValueError("Literal trait cannot have more than 20 classes")

    seen_names: set[str] = set()
    for class_name, class_desc in v.items():
        if not class_name.strip():
            raise ValueError("Class names cannot be empty")
        if not class_desc.strip():
            raise ValueError(f"Description for class '{class_name}' cannot be empty")
        lower_name = class_name.lower()
        if lower_name in seen_names:
            raise ValueError(f"Duplicate class name (case-insensitive): '{class_name}'")
        seen_names.add(lower_name)
    return v
validate_kind_fields
validate_kind_fields() -> LLMRubricTrait

Validate and set kind-specific fields.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_kind_fields(self) -> "LLMRubricTrait":
    """Validate and set kind-specific fields."""
    if self.kind == "literal":
        if self.classes is None:
            raise ValueError("classes field is required when kind='literal'")
        # Automatically derive min_score and max_score from classes
        object.__setattr__(self, "min_score", 0)
        object.__setattr__(self, "max_score", len(self.classes) - 1)
    return self
validate_score
validate_score(value: int | bool) -> bool

Validate that a given score is valid for this trait.

Source code in src/karenina/schemas/entities/rubric.py
def validate_score(self, value: int | bool) -> bool:
    """Validate that a given score is valid for this trait."""
    if self.kind == "boolean":
        return isinstance(value, bool)
    else:  # kind == "score" or kind == "literal"
        # Both use min_score/max_score (literal derives them from classes)
        # Reject boolean values explicitly (bool is a subclass of int in Python)
        if isinstance(value, bool):
            return False
        if not isinstance(value, int):
            return False
        # Use explicit None checks to allow min_score=0
        min_val = self.min_score if self.min_score is not None else 0
        max_val = self.max_score if self.max_score is not None else 5
        # For literal, also allow -1 as error state
        if self.kind == "literal" and value == -1:
            return True
        return min_val <= value <= max_val

LiteralMatch

Bases: VerificationPrimitive

Exact equality for Literal-typed fields.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class LiteralMatch(VerificationPrimitive):
    """Exact equality for Literal-typed fields."""

    def check(self, extracted: Any, expected: Any) -> bool:
        return bool(extracted == expected)

MetricRubricTrait

Bases: BaseModel

Metric evaluation trait using instruction-level confusion matrix analysis.

Two evaluation modes are supported:

  1. TP-only mode (evaluation_mode="tp_only"):
  2. User defines: TP instructions (what should be present)
  3. System evaluates:
    • TP: Instructions found in answer
    • FN: Instructions missing from answer
    • FP: Extra content in answer not matching TP instructions
    • TN: Cannot be computed (no explicit negative set)
  4. Available metrics: precision, recall, f1

  5. Full matrix mode (evaluation_mode="full_matrix"):

  6. User defines: TP instructions (should be present) + TN instructions (should NOT be present)
  7. System evaluates:
    • TP: TP instructions found in answer
    • FN: TP instructions missing from answer
    • TN: TN instructions correctly absent
    • FP: TN instructions incorrectly present
  8. Available metrics: precision, recall, specificity, accuracy, f1

The trait returns confusion matrix counts/lists and computed metric values.

Source code in src/karenina/schemas/entities/rubric.py
class MetricRubricTrait(BaseModel):
    """
    Metric evaluation trait using instruction-level confusion matrix analysis.

    Two evaluation modes are supported:

    1. TP-only mode (evaluation_mode="tp_only"):
       - User defines: TP instructions (what should be present)
       - System evaluates:
         * TP: Instructions found in answer
         * FN: Instructions missing from answer
         * FP: Extra content in answer not matching TP instructions
         * TN: Cannot be computed (no explicit negative set)
       - Available metrics: precision, recall, f1

    2. Full matrix mode (evaluation_mode="full_matrix"):
       - User defines: TP instructions (should be present) + TN instructions (should NOT be present)
       - System evaluates:
         * TP: TP instructions found in answer
         * FN: TP instructions missing from answer
         * TN: TN instructions correctly absent
         * FP: TN instructions incorrectly present
       - Available metrics: precision, recall, specificity, accuracy, f1

    The trait returns confusion matrix counts/lists and computed metric values.
    """

    name: str = Field(..., min_length=1, description="Human readable identifier for the trait")
    description: str | None = Field(None, description="Detailed description of what this trait evaluates")
    summary: str | None = Field(None, description="Short concept label for dynamic rubric presence check")
    evaluation_mode: Literal["tp_only", "full_matrix"] = Field(
        "tp_only", description="Evaluation mode: tp_only (only TP defined) or full_matrix (TP+TN defined)"
    )
    metrics: list[str] = Field(
        ...,
        min_length=1,
        description="List of metrics to compute (mode-dependent: see VALID_METRICS_TP_ONLY and VALID_METRICS_FULL_MATRIX)",
    )
    tp_instructions: list[str] = Field(
        default_factory=list,
        description="Instructions for what should be present in the answer",
    )
    tn_instructions: list[str] = Field(
        default_factory=list,
        description="Instructions for what should NOT be present in the answer (required in full_matrix mode, ignored in tp_only mode)",
    )
    repeated_extraction: bool = Field(
        True, description="Whether to deduplicate repeated excerpts/instructions (case-insensitive exact match)"
    )

    model_config = ConfigDict(extra="forbid")

    @field_validator("metrics")
    @classmethod
    def validate_metric_names(cls, v: list[str]) -> list[str]:
        """Validate that all requested metrics are valid."""
        if not v:
            raise ValueError("At least one metric must be specified")

        invalid_metrics = set(v) - VALID_METRICS
        if invalid_metrics:
            raise ValueError(f"Invalid metric names: {invalid_metrics}. Valid metrics are: {VALID_METRICS}")

        return v

    @model_validator(mode="after")
    def validate_metric_computability(self) -> "MetricRubricTrait":
        """Validate that requested metrics are compatible with the evaluation mode and provided instructions."""
        # Validate TP instructions are always provided
        if not self.tp_instructions:
            raise ValueError("TP instructions must be provided (define what should be present in the answer)")

        # Mode-specific validation
        if self.evaluation_mode == "tp_only":
            # TP-only mode: TN instructions should be empty, validate metrics
            valid_metrics_for_mode = VALID_METRICS_TP_ONLY
            invalid_for_mode = set(self.metrics) - valid_metrics_for_mode
            if invalid_for_mode:
                raise ValueError(
                    f"Metrics {invalid_for_mode} are not available in tp_only mode. "
                    f"Available metrics: {valid_metrics_for_mode}. "
                    f"Use full_matrix mode for specificity and accuracy."
                )
            # In tp_only mode, we can compute: TP, FN, FP (but not TN)
            available_buckets = {"tp", "fn", "fp"}

        elif self.evaluation_mode == "full_matrix":
            # Full matrix mode: Both TP and TN instructions required
            if not self.tn_instructions:
                raise ValueError(
                    "TN instructions must be provided in full_matrix mode "
                    "(define what should NOT be present in the answer)"
                )
            valid_metrics_for_mode = VALID_METRICS_FULL_MATRIX
            invalid_for_mode = set(self.metrics) - valid_metrics_for_mode
            if invalid_for_mode:
                raise ValueError(
                    f"Metrics {invalid_for_mode} are not valid. Available metrics: {valid_metrics_for_mode}"
                )
            # In full_matrix mode, we can compute all four: TP, FN, TN, FP
            available_buckets = {"tp", "fn", "tn", "fp"}

        # Check each requested metric can be computed
        uncomputable_metrics = []
        for metric in self.metrics:
            required_buckets = METRIC_REQUIREMENTS[metric]
            if not required_buckets.issubset(available_buckets):
                missing = required_buckets - available_buckets
                uncomputable_metrics.append(f"{metric} (needs {missing})")

        if uncomputable_metrics:
            raise ValueError(
                f"Cannot compute the following metrics with current mode ({self.evaluation_mode}): "
                f"{', '.join(uncomputable_metrics)}. "
                f"Available buckets: {available_buckets}"
            )

        return self

    def get_required_buckets(self) -> set[str]:
        """Get the set of confusion matrix buckets that will be computed for this trait."""
        if self.evaluation_mode == "tp_only":
            return {"tp", "fn", "fp"}  # TN cannot be computed
        else:  # full_matrix
            return {"tp", "fn", "tn", "fp"}  # All four can be computed
Functions
get_required_buckets
get_required_buckets() -> set[str]

Get the set of confusion matrix buckets that will be computed for this trait.

Source code in src/karenina/schemas/entities/rubric.py
def get_required_buckets(self) -> set[str]:
    """Get the set of confusion matrix buckets that will be computed for this trait."""
    if self.evaluation_mode == "tp_only":
        return {"tp", "fn", "fp"}  # TN cannot be computed
    else:  # full_matrix
        return {"tp", "fn", "tn", "fp"}  # All four can be computed
validate_metric_computability
validate_metric_computability() -> MetricRubricTrait

Validate that requested metrics are compatible with the evaluation mode and provided instructions.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_metric_computability(self) -> "MetricRubricTrait":
    """Validate that requested metrics are compatible with the evaluation mode and provided instructions."""
    # Validate TP instructions are always provided
    if not self.tp_instructions:
        raise ValueError("TP instructions must be provided (define what should be present in the answer)")

    # Mode-specific validation
    if self.evaluation_mode == "tp_only":
        # TP-only mode: TN instructions should be empty, validate metrics
        valid_metrics_for_mode = VALID_METRICS_TP_ONLY
        invalid_for_mode = set(self.metrics) - valid_metrics_for_mode
        if invalid_for_mode:
            raise ValueError(
                f"Metrics {invalid_for_mode} are not available in tp_only mode. "
                f"Available metrics: {valid_metrics_for_mode}. "
                f"Use full_matrix mode for specificity and accuracy."
            )
        # In tp_only mode, we can compute: TP, FN, FP (but not TN)
        available_buckets = {"tp", "fn", "fp"}

    elif self.evaluation_mode == "full_matrix":
        # Full matrix mode: Both TP and TN instructions required
        if not self.tn_instructions:
            raise ValueError(
                "TN instructions must be provided in full_matrix mode "
                "(define what should NOT be present in the answer)"
            )
        valid_metrics_for_mode = VALID_METRICS_FULL_MATRIX
        invalid_for_mode = set(self.metrics) - valid_metrics_for_mode
        if invalid_for_mode:
            raise ValueError(
                f"Metrics {invalid_for_mode} are not valid. Available metrics: {valid_metrics_for_mode}"
            )
        # In full_matrix mode, we can compute all four: TP, FN, TN, FP
        available_buckets = {"tp", "fn", "tn", "fp"}

    # Check each requested metric can be computed
    uncomputable_metrics = []
    for metric in self.metrics:
        required_buckets = METRIC_REQUIREMENTS[metric]
        if not required_buckets.issubset(available_buckets):
            missing = required_buckets - available_buckets
            uncomputable_metrics.append(f"{metric} (needs {missing})")

    if uncomputable_metrics:
        raise ValueError(
            f"Cannot compute the following metrics with current mode ({self.evaluation_mode}): "
            f"{', '.join(uncomputable_metrics)}. "
            f"Available buckets: {available_buckets}"
        )

    return self
validate_metric_names classmethod
validate_metric_names(v: list[str]) -> list[str]

Validate that all requested metrics are valid.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("metrics")
@classmethod
def validate_metric_names(cls, v: list[str]) -> list[str]:
    """Validate that all requested metrics are valid."""
    if not v:
        raise ValueError("At least one metric must be specified")

    invalid_metrics = set(v) - VALID_METRICS
    if invalid_metrics:
        raise ValueError(f"Invalid metric names: {invalid_metrics}. Valid metrics are: {VALID_METRICS}")

    return v

NumericExact

Bases: VerificationPrimitive

Exact numeric equality after float coercion.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class NumericExact(VerificationPrimitive):
    """Exact numeric equality after float coercion."""

    def check(self, extracted: Any, expected: Any) -> bool:
        return float(extracted) == float(expected)

NumericRange

Bases: VerificationPrimitive

Check that extracted number falls within a range.

Either min or max can be None for open-ended ranges. Boundaries are inclusive by default. Set exclusive_min or exclusive_max to True for strict inequality on that side.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class NumericRange(VerificationPrimitive):
    """Check that extracted number falls within a range.

    Either min or max can be None for open-ended ranges.
    Boundaries are inclusive by default. Set ``exclusive_min`` or
    ``exclusive_max`` to True for strict inequality on that side.
    """

    min: float | None = None
    max: float | None = None
    exclusive_min: bool = False
    exclusive_max: bool = False

    def check(self, extracted: Any, _expected: Any) -> bool:
        val = float(extracted)
        if self.min is not None:
            if self.exclusive_min and val <= self.min:
                return False
            if not self.exclusive_min and val < self.min:
                return False
        if self.max is not None:
            if self.exclusive_max and val >= self.max:
                return False
            if not self.exclusive_max and val > self.max:
                return False
        return True

NumericTolerance

Bases: VerificationPrimitive

Check that extracted number is within tolerance of expected.

Modes: - "relative": |extracted - expected| / |expected| <= tolerance - "absolute": |extracted - expected| <= tolerance

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class NumericTolerance(VerificationPrimitive):
    """Check that extracted number is within tolerance of expected.

    Modes:
    - "relative": |extracted - expected| / |expected| <= tolerance
    - "absolute": |extracted - expected| <= tolerance
    """

    tolerance: float
    mode: Literal["relative", "absolute"] = "relative"

    def check(self, extracted: Any, expected: Any) -> bool:
        diff = abs(float(extracted) - float(expected))
        if self.mode == "relative":
            if float(expected) == 0:
                return diff == 0
            return diff / abs(float(expected)) <= self.tolerance
        return diff <= self.tolerance

OrderedMatch

Bases: VerificationPrimitive

Compare lists element-by-element after normalization.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class OrderedMatch(VerificationPrimitive):
    """Compare lists element-by-element after normalization."""

    normalize: list[Normalizer] = ["lowercase", "strip"]

    def check(self, extracted: Any, expected: Any) -> bool:
        if len(extracted) != len(expected):
            return False
        for e, x in zip(extracted, expected, strict=False):
            e_norm = apply_normalizers(self.normalize, str(e))
            x_norm = apply_normalizers(self.normalize, str(x))
            if e_norm != x_norm:
                return False
        return True

Question

Bases: BaseModel

Represents a self-contained benchmark question with its metadata.

This class defines the structure and validation rules for questions in the benchmark, including unique identifiers, question text, categorization keywords, and intrinsic metadata.

Backward compatibility: the legacy tags key is accepted during construction and automatically converted to keywords.

Source code in src/karenina/schemas/entities/question.py
class Question(BaseModel):
    """Represents a self-contained benchmark question with its metadata.

    This class defines the structure and validation rules for questions
    in the benchmark, including unique identifiers, question text,
    categorization keywords, and intrinsic metadata.

    Backward compatibility: the legacy ``tags`` key is accepted during
    construction and automatically converted to ``keywords``.
    """

    model_config = ConfigDict(populate_by_name=True, extra="forbid")

    question: str = Field(description="Question text", min_length=1)
    raw_answer: str = Field(description="Raw answer text", min_length=1)
    keywords: list[str] = Field(default_factory=list, description="Keywords for the question")
    few_shot_examples: list[dict[str, str]] | None = Field(
        default=None, description="Optional few-shot examples as question-answer pairs"
    )

    # Intrinsic metadata
    date_created: str = Field(default_factory=lambda: datetime.now().isoformat())
    date_modified: str = Field(default_factory=lambda: datetime.now().isoformat())
    answer_template: str | None = None
    answer_notes: str | None = Field(
        default=None,
        description=(
            "Free-text notes about how the answer should be interpreted. "
            "Used by the template generation pipeline to guide field type "
            "selection and extraction behavior (e.g., overriding the "
            "boolean-first default). Stored in checkpoints and accepted "
            "by the API. Not consumed by verification pipeline stages."
        ),
    )
    author: dict[str, Any] | None = None
    sources: list[dict[str, Any]] | None = None
    custom_metadata: dict[str, Any] | None = None
    question_rubric: dict[str, Any] | None = None
    question_dynamic_rubric: dict[str, Any] | None = None
    workspace_path: str | None = Field(
        default=None,
        description=(
            "Relative path from workspace_root to this question's working "
            "directory. For coding benchmarks, this points to the pre-existing "
            "folder containing starter code, tests, or other artifacts for this "
            "task (e.g., 'task_01'). Resolved as workspace_root / workspace_path."
        ),
    )

    @model_validator(mode="before")
    @classmethod
    def _convert_legacy_tags(cls, data: Any) -> Any:
        """Convert legacy ``tags`` key to ``keywords`` and strip computed fields.

        The computed field ``id`` appears in serialized output but is not an
        input field. Strip it so that round-tripping via model_dump/model_validate
        works with extra="forbid".
        """
        if isinstance(data, dict):
            data.pop("id", None)
            if "tags" in data:
                tags = data.pop("tags")
                if "keywords" not in data:
                    # Filter out None values from legacy tags list
                    data["keywords"] = [t for t in (tags or []) if t is not None]
        return data

    @computed_field  # type: ignore[prop-decorator]
    @property
    def id(self) -> str:
        """Auto-generated MD5 hash of the question text."""
        return hashlib.md5(self.question.encode("utf-8")).hexdigest()
Attributes
id property
id: str

Auto-generated MD5 hash of the question text.

QuestionRegistryEntry

Bases: BaseModel

Tracks benchmark-level state for a question.

This is separate from the Question model because finished status and benchmark-level timestamps are properties of the question's membership in a benchmark, not intrinsic to the question itself.

Source code in src/karenina/schemas/entities/question.py
class QuestionRegistryEntry(BaseModel):
    """Tracks benchmark-level state for a question.

    This is separate from the Question model because ``finished`` status
    and benchmark-level timestamps are properties of the question's
    membership in a benchmark, not intrinsic to the question itself.
    """

    model_config = ConfigDict(extra="forbid")

    finished: bool = False
    date_added: str = Field(default_factory=lambda: datetime.now().isoformat())
    date_modified: str = Field(default_factory=lambda: datetime.now().isoformat())

RegexMatch

Bases: VerificationPrimitive

Check that extracted text matches a regex pattern.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class RegexMatch(VerificationPrimitive):
    """Check that extracted text matches a regex pattern."""

    pattern: str
    flags: list[str] = []

    def check(self, extracted: Any, _expected: Any) -> bool:
        flag_value = 0
        for f in self.flags:
            resolved = getattr(re, f, None)
            if resolved is None:
                logger.warning("Unknown regex flag %r, ignoring", f)
            else:
                flag_value |= resolved
        return bool(re.search(self.pattern, str(extracted), flag_value))

RegexRubricTrait

Bases: BaseModel

Regex-based evaluation trait for deterministic pattern matching.

This trait type uses regular expressions to perform simple text matching against answers. It always returns a boolean result.

Examples:

  • Email format validation: r"\S+@\S+"
  • Keyword presence: r"\bmachine learning\b"
  • URL detection: r"https?://[^\s]+"
Source code in src/karenina/schemas/entities/rubric.py
class RegexRubricTrait(BaseModel):
    """
    Regex-based evaluation trait for deterministic pattern matching.

    This trait type uses regular expressions to perform simple text matching
    against answers. It always returns a boolean result.

    Examples:
        - Email format validation: r"\\S+@\\S+"
        - Keyword presence: r"\\bmachine learning\\b"
        - URL detection: r"https?://[^\\s]+"
    """

    name: str = Field(..., min_length=1, description="Human readable identifier for the trait")
    description: str | None = Field(None, description="Detailed description of what this trait evaluates")
    summary: str | None = Field(None, description="Short concept label for dynamic rubric presence check")
    pattern: str = Field(..., description="Regex pattern to match against text")
    case_sensitive: bool = Field(True, description="Whether pattern matching should be case sensitive")
    invert_result: bool = Field(False, description="Whether to invert the boolean result (for negative matching)")

    # Directionality field
    higher_is_better: bool = Field(
        ...,
        description="Whether a regex match indicates a positive outcome. True: match = good. False: match = bad.",
    )

    model_config = ConfigDict(extra="forbid")

    @model_validator(mode="before")
    @classmethod
    def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Set default for higher_is_better when loading legacy data."""
        if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
            values["higher_is_better"] = True
        return values

    @field_validator("pattern")
    @classmethod
    def validate_regex_pattern(cls, v: str) -> str:
        """Validate that pattern is a valid regex."""
        try:
            re.compile(v)
        except re.error as e:
            raise ValueError(f"Invalid regex pattern: {e}") from e
        return v

    def evaluate(self, text: str) -> bool:
        """
        Evaluate the trait against the provided text.

        Args:
            text: The text to evaluate

        Returns:
            Boolean evaluation result

        Raises:
            RuntimeError: If evaluation fails
        """
        try:
            flags = 0 if self.case_sensitive else re.IGNORECASE
            match = re.search(self.pattern, text, flags)
            result = match is not None
            return not result if self.invert_result else result
        except Exception as e:
            raise RuntimeError(f"Failed to evaluate regex trait '{self.name}': {e}") from e
Functions
evaluate
evaluate(text: str) -> bool

Evaluate the trait against the provided text.

Parameters:

Name Type Description Default
text str

The text to evaluate

required

Returns:

Type Description
bool

Boolean evaluation result

Raises:

Type Description
RuntimeError

If evaluation fails

Source code in src/karenina/schemas/entities/rubric.py
def evaluate(self, text: str) -> bool:
    """
    Evaluate the trait against the provided text.

    Args:
        text: The text to evaluate

    Returns:
        Boolean evaluation result

    Raises:
        RuntimeError: If evaluation fails
    """
    try:
        flags = 0 if self.case_sensitive else re.IGNORECASE
        match = re.search(self.pattern, text, flags)
        result = match is not None
        return not result if self.invert_result else result
    except Exception as e:
        raise RuntimeError(f"Failed to evaluate regex trait '{self.name}': {e}") from e
set_legacy_defaults classmethod
set_legacy_defaults(
    values: dict[str, Any],
) -> dict[str, Any]

Set default for higher_is_better when loading legacy data.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="before")
@classmethod
def set_legacy_defaults(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Set default for higher_is_better when loading legacy data."""
    if isinstance(values, dict) and ("higher_is_better" not in values or values.get("higher_is_better") is None):
        values["higher_is_better"] = True
    return values
validate_regex_pattern classmethod
validate_regex_pattern(v: str) -> str

Validate that pattern is a valid regex.

Source code in src/karenina/schemas/entities/rubric.py
@field_validator("pattern")
@classmethod
def validate_regex_pattern(cls, v: str) -> str:
    """Validate that pattern is a valid regex."""
    try:
        re.compile(v)
    except re.error as e:
        raise ValueError(f"Invalid regex pattern: {e}") from e
    return v

Rubric

Bases: BaseModel

Collection of evaluation traits applied to all question-answer pairs.

A rubric defines the qualitative criteria used to evaluate LLM responses beyond basic correctness checking. Supports LLM-based, regex, callable, and metric traits.

Source code in src/karenina/schemas/entities/rubric.py
class Rubric(BaseModel):
    """
    Collection of evaluation traits applied to all question-answer pairs.

    A rubric defines the qualitative criteria used to evaluate LLM responses
    beyond basic correctness checking. Supports LLM-based, regex, callable, and metric traits.
    """

    llm_traits: list[LLMRubricTrait] = Field(default_factory=list, description="List of LLM-based evaluation traits")
    regex_traits: list[RegexRubricTrait] = Field(
        default_factory=list, description="List of regex-based evaluation traits"
    )
    callable_traits: list[CallableRubricTrait] = Field(
        default_factory=list, description="List of callable function-based evaluation traits"
    )
    metric_traits: list[MetricRubricTrait] = Field(
        default_factory=list, description="List of metric-based evaluation traits (confusion-matrix analysis)"
    )
    agentic_traits: list[AgenticRubricTrait] = Field(
        default_factory=list,
        description="List of agent-investigated evaluation traits",
    )

    model_config = ConfigDict(extra="forbid")

    @model_validator(mode="after")
    def validate_trait_names(self) -> "Rubric":
        """Reject duplicate trait names (within and across types) and dots in agentic names.

        Each trait type list must have unique names. Cross-type name overlaps
        are also rejected because downstream consumers (DataFrames, result
        dicts) use trait names as keys without type prefixes.

        Dots in agentic trait names are rejected because template kind traits
        produce dot-expanded keys (``trait.field``). A trait named ``"foo.bar"``
        would be ambiguous.
        """
        type_lists: list[tuple[str, list[Any]]] = [
            ("llm", self.llm_traits),
            ("regex", self.regex_traits),
            ("callable", self.callable_traits),
            ("metric", self.metric_traits),
            ("agentic", self.agentic_traits),
        ]
        for type_label, traits in type_lists:
            seen: set[str] = set()
            for trait in traits:
                if trait.name in seen:
                    raise ValueError(
                        f"Duplicate {type_label} trait name '{trait.name}' "
                        f"within the same rubric. Trait names must be unique "
                        f"per type."
                    )
                seen.add(trait.name)

        # Cross-type uniqueness check
        all_names = self.get_trait_names()
        seen_all: set[str] = set()
        for name in all_names:
            if name in seen_all:
                raise ValueError(
                    f"Duplicate trait name '{name}' across different trait types. "
                    f"Trait names must be unique across all types within a rubric."
                )
            seen_all.add(name)

        for trait in self.agentic_traits:
            if "." in trait.name:
                raise ValueError(
                    f"Agentic trait name '{trait.name}' contains '.', "
                    f"which would collide with dot-notation keys from "
                    f"template-kind traits."
                )
        return self

    def get_trait_names(self) -> list[str]:
        """Get list of all trait names in this rubric (LLM, regex, callable, metric, and agentic)."""
        llm_names = [trait.name for trait in self.llm_traits]
        regex_names = [trait.name for trait in self.regex_traits]
        callable_names = [trait.name for trait in self.callable_traits]
        metric_names = [trait.name for trait in self.metric_traits]
        agentic_names = [trait.name for trait in self.agentic_traits]
        return llm_names + regex_names + callable_names + metric_names + agentic_names

    def get_llm_trait_names(self) -> list[str]:
        """Get list of LLM trait names only."""
        return [trait.name for trait in self.llm_traits]

    def get_regex_trait_names(self) -> list[str]:
        """Get list of regex trait names only."""
        return [trait.name for trait in self.regex_traits]

    def get_callable_trait_names(self) -> list[str]:
        """Get list of callable trait names only."""
        return [trait.name for trait in self.callable_traits]

    def get_metric_trait_names(self) -> list[str]:
        """Get list of metric trait names only."""
        return [trait.name for trait in self.metric_traits]

    def get_agentic_trait_names(self) -> list[str]:
        """Get list of agentic trait names only."""
        return [trait.name for trait in self.agentic_traits]

    def get_trait_max_scores(self) -> dict[str, int]:
        """Get max_score for all score-based traits (LLM and callable).

        Returns:
            Dict mapping trait name to max_score for traits with kind='score' or 'literal'.
            Boolean traits and metric traits are not included.
            For literal traits, max_score is len(classes)-1.
        """
        max_scores: dict[str, int] = {}

        for llm_trait in self.llm_traits:
            if llm_trait.kind in ("score", "literal") and llm_trait.max_score is not None:
                max_scores[llm_trait.name] = llm_trait.max_score

        for callable_trait in self.callable_traits:
            if callable_trait.kind == "score" and callable_trait.max_score is not None:
                max_scores[callable_trait.name] = callable_trait.max_score

        for agentic_trait in self.agentic_traits:
            if agentic_trait.kind in ("score", "literal") and agentic_trait.max_score is not None:
                max_scores[agentic_trait.name] = agentic_trait.max_score

        return max_scores

    def get_trait_directionalities(self) -> dict[str, bool | None]:
        """Get higher_is_better for LLM, regex, callable, and agentic traits.

        Note: MetricRubricTraits are excluded as metrics (precision/recall/F1)
        are inherently 'higher is better'.

        Returns:
            Dict mapping trait name to higher_is_better value. Template kind
            agentic traits map to None because directionality is not meaningful
            for structured results.
        """
        directionalities: dict[str, bool | None] = {}

        llm_trait: LLMRubricTrait
        for llm_trait in self.llm_traits:
            directionalities[llm_trait.name] = llm_trait.higher_is_better

        regex_trait: RegexRubricTrait
        for regex_trait in self.regex_traits:
            directionalities[regex_trait.name] = regex_trait.higher_is_better

        callable_trait: CallableRubricTrait
        for callable_trait in self.callable_traits:
            directionalities[callable_trait.name] = callable_trait.higher_is_better

        for agentic_trait in self.agentic_traits:
            directionalities[agentic_trait.name] = agentic_trait.higher_is_better

        # MetricRubricTraits always have higher_is_better=True (implicit)
        return directionalities

    def validate_evaluation(self, evaluation: dict[str, int | bool]) -> bool:
        """
        Validate that an evaluation result matches this rubric structure.

        Note: This validates LLM, regex, callable, and agentic trait scores. Metric traits
        are stored separately in VerificationResult fields (metric_trait_confusion_lists
        and metric_trait_metrics) and don't participate in this validation.

        Template kind agentic traits produce dot-expanded keys (e.g.
        ``"trait_name.field_name"``), so the expected names set and per-key
        validation logic account for this notation.
        """
        # Get trait names excluding metric traits (they're validated separately)
        llm_names = set(self.get_llm_trait_names())
        regex_names = set(self.get_regex_trait_names())
        callable_names = set(self.get_callable_trait_names())

        # For agentic traits, expand template kinds to dot-notation keys
        agentic_expected: set[str] = set()
        for trait in self.agentic_traits:
            if trait.is_template_kind:
                assert isinstance(trait.kind, type)  # narrows for mypy
                for field_name in trait.kind.model_fields:
                    agentic_expected.add(f"{trait.name}.{field_name}")
            else:
                agentic_expected.add(trait.name)

        expected_names = llm_names | regex_names | callable_names | agentic_expected

        eval_names = set(evaluation.keys())

        # Check that all expected trait names are present
        if expected_names != eval_names:
            return False

        # Check that each score is valid for its trait
        llm_trait_map = {trait.name: trait for trait in self.llm_traits}
        regex_trait_map = {trait.name: trait for trait in self.regex_traits}
        callable_trait_map = {trait.name: trait for trait in self.callable_traits}
        agentic_trait_map = {trait.name: trait for trait in self.agentic_traits}

        for key, value in evaluation.items():
            trait_name = key.split(".")[0] if "." in key else key
            if trait_name in llm_trait_map:
                if not llm_trait_map[trait_name].validate_score(value):
                    return False
            elif trait_name in agentic_trait_map:
                trait = agentic_trait_map[trait_name]
                if trait.is_template_kind:
                    continue  # Template fields not individually validated
                if not trait.validate_score(value):
                    return False
            elif trait_name in regex_trait_map or trait_name in callable_trait_map:
                # Regex and callable traits always return boolean
                if not isinstance(value, bool):
                    return False
            else:
                # Unknown trait name
                return False

        return True
Functions
get_agentic_trait_names
get_agentic_trait_names() -> list[str]

Get list of agentic trait names only.

Source code in src/karenina/schemas/entities/rubric.py
def get_agentic_trait_names(self) -> list[str]:
    """Get list of agentic trait names only."""
    return [trait.name for trait in self.agentic_traits]
get_callable_trait_names
get_callable_trait_names() -> list[str]

Get list of callable trait names only.

Source code in src/karenina/schemas/entities/rubric.py
def get_callable_trait_names(self) -> list[str]:
    """Get list of callable trait names only."""
    return [trait.name for trait in self.callable_traits]
get_llm_trait_names
get_llm_trait_names() -> list[str]

Get list of LLM trait names only.

Source code in src/karenina/schemas/entities/rubric.py
def get_llm_trait_names(self) -> list[str]:
    """Get list of LLM trait names only."""
    return [trait.name for trait in self.llm_traits]
get_metric_trait_names
get_metric_trait_names() -> list[str]

Get list of metric trait names only.

Source code in src/karenina/schemas/entities/rubric.py
def get_metric_trait_names(self) -> list[str]:
    """Get list of metric trait names only."""
    return [trait.name for trait in self.metric_traits]
get_regex_trait_names
get_regex_trait_names() -> list[str]

Get list of regex trait names only.

Source code in src/karenina/schemas/entities/rubric.py
def get_regex_trait_names(self) -> list[str]:
    """Get list of regex trait names only."""
    return [trait.name for trait in self.regex_traits]
get_trait_directionalities
get_trait_directionalities() -> dict[str, bool | None]

Get higher_is_better for LLM, regex, callable, and agentic traits.

Note: MetricRubricTraits are excluded as metrics (precision/recall/F1) are inherently 'higher is better'.

Returns:

Type Description
dict[str, bool | None]

Dict mapping trait name to higher_is_better value. Template kind

dict[str, bool | None]

agentic traits map to None because directionality is not meaningful

dict[str, bool | None]

for structured results.

Source code in src/karenina/schemas/entities/rubric.py
def get_trait_directionalities(self) -> dict[str, bool | None]:
    """Get higher_is_better for LLM, regex, callable, and agentic traits.

    Note: MetricRubricTraits are excluded as metrics (precision/recall/F1)
    are inherently 'higher is better'.

    Returns:
        Dict mapping trait name to higher_is_better value. Template kind
        agentic traits map to None because directionality is not meaningful
        for structured results.
    """
    directionalities: dict[str, bool | None] = {}

    llm_trait: LLMRubricTrait
    for llm_trait in self.llm_traits:
        directionalities[llm_trait.name] = llm_trait.higher_is_better

    regex_trait: RegexRubricTrait
    for regex_trait in self.regex_traits:
        directionalities[regex_trait.name] = regex_trait.higher_is_better

    callable_trait: CallableRubricTrait
    for callable_trait in self.callable_traits:
        directionalities[callable_trait.name] = callable_trait.higher_is_better

    for agentic_trait in self.agentic_traits:
        directionalities[agentic_trait.name] = agentic_trait.higher_is_better

    # MetricRubricTraits always have higher_is_better=True (implicit)
    return directionalities
get_trait_max_scores
get_trait_max_scores() -> dict[str, int]

Get max_score for all score-based traits (LLM and callable).

Returns:

Type Description
dict[str, int]

Dict mapping trait name to max_score for traits with kind='score' or 'literal'.

dict[str, int]

Boolean traits and metric traits are not included.

dict[str, int]

For literal traits, max_score is len(classes)-1.

Source code in src/karenina/schemas/entities/rubric.py
def get_trait_max_scores(self) -> dict[str, int]:
    """Get max_score for all score-based traits (LLM and callable).

    Returns:
        Dict mapping trait name to max_score for traits with kind='score' or 'literal'.
        Boolean traits and metric traits are not included.
        For literal traits, max_score is len(classes)-1.
    """
    max_scores: dict[str, int] = {}

    for llm_trait in self.llm_traits:
        if llm_trait.kind in ("score", "literal") and llm_trait.max_score is not None:
            max_scores[llm_trait.name] = llm_trait.max_score

    for callable_trait in self.callable_traits:
        if callable_trait.kind == "score" and callable_trait.max_score is not None:
            max_scores[callable_trait.name] = callable_trait.max_score

    for agentic_trait in self.agentic_traits:
        if agentic_trait.kind in ("score", "literal") and agentic_trait.max_score is not None:
            max_scores[agentic_trait.name] = agentic_trait.max_score

    return max_scores
get_trait_names
get_trait_names() -> list[str]

Get list of all trait names in this rubric (LLM, regex, callable, metric, and agentic).

Source code in src/karenina/schemas/entities/rubric.py
def get_trait_names(self) -> list[str]:
    """Get list of all trait names in this rubric (LLM, regex, callable, metric, and agentic)."""
    llm_names = [trait.name for trait in self.llm_traits]
    regex_names = [trait.name for trait in self.regex_traits]
    callable_names = [trait.name for trait in self.callable_traits]
    metric_names = [trait.name for trait in self.metric_traits]
    agentic_names = [trait.name for trait in self.agentic_traits]
    return llm_names + regex_names + callable_names + metric_names + agentic_names
validate_evaluation
validate_evaluation(
    evaluation: dict[str, int | bool],
) -> bool

Validate that an evaluation result matches this rubric structure.

Note: This validates LLM, regex, callable, and agentic trait scores. Metric traits are stored separately in VerificationResult fields (metric_trait_confusion_lists and metric_trait_metrics) and don't participate in this validation.

Template kind agentic traits produce dot-expanded keys (e.g. "trait_name.field_name"), so the expected names set and per-key validation logic account for this notation.

Source code in src/karenina/schemas/entities/rubric.py
def validate_evaluation(self, evaluation: dict[str, int | bool]) -> bool:
    """
    Validate that an evaluation result matches this rubric structure.

    Note: This validates LLM, regex, callable, and agentic trait scores. Metric traits
    are stored separately in VerificationResult fields (metric_trait_confusion_lists
    and metric_trait_metrics) and don't participate in this validation.

    Template kind agentic traits produce dot-expanded keys (e.g.
    ``"trait_name.field_name"``), so the expected names set and per-key
    validation logic account for this notation.
    """
    # Get trait names excluding metric traits (they're validated separately)
    llm_names = set(self.get_llm_trait_names())
    regex_names = set(self.get_regex_trait_names())
    callable_names = set(self.get_callable_trait_names())

    # For agentic traits, expand template kinds to dot-notation keys
    agentic_expected: set[str] = set()
    for trait in self.agentic_traits:
        if trait.is_template_kind:
            assert isinstance(trait.kind, type)  # narrows for mypy
            for field_name in trait.kind.model_fields:
                agentic_expected.add(f"{trait.name}.{field_name}")
        else:
            agentic_expected.add(trait.name)

    expected_names = llm_names | regex_names | callable_names | agentic_expected

    eval_names = set(evaluation.keys())

    # Check that all expected trait names are present
    if expected_names != eval_names:
        return False

    # Check that each score is valid for its trait
    llm_trait_map = {trait.name: trait for trait in self.llm_traits}
    regex_trait_map = {trait.name: trait for trait in self.regex_traits}
    callable_trait_map = {trait.name: trait for trait in self.callable_traits}
    agentic_trait_map = {trait.name: trait for trait in self.agentic_traits}

    for key, value in evaluation.items():
        trait_name = key.split(".")[0] if "." in key else key
        if trait_name in llm_trait_map:
            if not llm_trait_map[trait_name].validate_score(value):
                return False
        elif trait_name in agentic_trait_map:
            trait = agentic_trait_map[trait_name]
            if trait.is_template_kind:
                continue  # Template fields not individually validated
            if not trait.validate_score(value):
                return False
        elif trait_name in regex_trait_map or trait_name in callable_trait_map:
            # Regex and callable traits always return boolean
            if not isinstance(value, bool):
                return False
        else:
            # Unknown trait name
            return False

    return True
validate_trait_names
validate_trait_names() -> Rubric

Reject duplicate trait names (within and across types) and dots in agentic names.

Each trait type list must have unique names. Cross-type name overlaps are also rejected because downstream consumers (DataFrames, result dicts) use trait names as keys without type prefixes.

Dots in agentic trait names are rejected because template kind traits produce dot-expanded keys (trait.field). A trait named "foo.bar" would be ambiguous.

Source code in src/karenina/schemas/entities/rubric.py
@model_validator(mode="after")
def validate_trait_names(self) -> "Rubric":
    """Reject duplicate trait names (within and across types) and dots in agentic names.

    Each trait type list must have unique names. Cross-type name overlaps
    are also rejected because downstream consumers (DataFrames, result
    dicts) use trait names as keys without type prefixes.

    Dots in agentic trait names are rejected because template kind traits
    produce dot-expanded keys (``trait.field``). A trait named ``"foo.bar"``
    would be ambiguous.
    """
    type_lists: list[tuple[str, list[Any]]] = [
        ("llm", self.llm_traits),
        ("regex", self.regex_traits),
        ("callable", self.callable_traits),
        ("metric", self.metric_traits),
        ("agentic", self.agentic_traits),
    ]
    for type_label, traits in type_lists:
        seen: set[str] = set()
        for trait in traits:
            if trait.name in seen:
                raise ValueError(
                    f"Duplicate {type_label} trait name '{trait.name}' "
                    f"within the same rubric. Trait names must be unique "
                    f"per type."
                )
            seen.add(trait.name)

    # Cross-type uniqueness check
    all_names = self.get_trait_names()
    seen_all: set[str] = set()
    for name in all_names:
        if name in seen_all:
            raise ValueError(
                f"Duplicate trait name '{name}' across different trait types. "
                f"Trait names must be unique across all types within a rubric."
            )
        seen_all.add(name)

    for trait in self.agentic_traits:
        if "." in trait.name:
            raise ValueError(
                f"Agentic trait name '{trait.name}' contains '.', "
                f"which would collide with dot-notation keys from "
                f"template-kind traits."
            )
    return self

RubricEvaluation

Bases: BaseModel

Result of applying a rubric to a specific question-answer pair.

Source code in src/karenina/schemas/entities/rubric.py
class RubricEvaluation(BaseModel):
    """
    Result of applying a rubric to a specific question-answer pair.
    """

    trait_scores: dict[str, int | bool] = Field(..., description="Scores for each trait")

    model_config = ConfigDict(extra="forbid")

SemanticMatch

Bases: VerificationPrimitive

Check embedding similarity between extracted and expected text.

Requires an embedding model to be configured at runtime.

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class SemanticMatch(VerificationPrimitive):
    """Check embedding similarity between extracted and expected text.

    Requires an embedding model to be configured at runtime.
    """

    threshold: float = 0.85

    def check(self, extracted: Any, expected: Any) -> bool:
        raise NotImplementedError(
            "SemanticMatch.check() requires embedding infrastructure. Use the embedding_check pipeline stage instead."
        )

SetContainment

Bases: VerificationPrimitive

Compare lists as sets with configurable containment mode.

Modes: - "exact": extracted and expected contain the same elements - "subset": extracted is a subset of expected - "superset": extracted is a superset of expected - "overlap": at least min_overlap elements in common

Source code in src/karenina/schemas/primitives/comparisons.py
@_register_primitive
class SetContainment(VerificationPrimitive):
    """Compare lists as sets with configurable containment mode.

    Modes:
    - "exact": extracted and expected contain the same elements
    - "subset": extracted is a subset of expected
    - "superset": extracted is a superset of expected
    - "overlap": at least min_overlap elements in common
    """

    mode: str = "exact"
    min_overlap: int | None = None

    def check(self, extracted: Any, expected: Any) -> bool:
        e_set = set(extracted)
        x_set = set(expected)
        if self.mode == "exact":
            return e_set == x_set
        if self.mode == "subset":
            return e_set <= x_set
        if self.mode == "superset":
            return e_set >= x_set
        if self.mode == "overlap":
            overlap = len(e_set & x_set)
            return overlap >= (self.min_overlap or 1)
        raise ValueError(f"Unknown mode: {self.mode!r}")

SynonymMap

Bases: BaseModel

Maps known synonyms to canonical forms before comparison.

Source code in src/karenina/schemas/primitives/normalizers.py
class SynonymMap(BaseModel):
    """Maps known synonyms to canonical forms before comparison."""

    mapping: dict[str, str]

TemplateFieldSpec

Bases: BaseModel

Specification for a single template field.

Maps to a VerifiedField declaration in the Python template code.

Source code in src/karenina/schemas/entities/template_spec.py
class TemplateFieldSpec(BaseModel):
    """Specification for a single template field.

    Maps to a VerifiedField declaration in the Python template code.
    """

    model_config = ConfigDict(extra="forbid")

    name: str = Field(..., description="Python identifier for the field.")
    type: str = Field(
        ...,
        description=("Field type: 'bool', 'str', 'int', 'float', 'list_str', 'literal', 'date'."),
    )
    description: str = Field(..., description="Field description for the judge LLM.")
    extraction_hint: str | None = Field(
        None,
        description="Optional hint about normalization or formatting for the judge.",
    )
    ground_truth: Any = Field(..., description="Expected correct value.")
    literal_values: list[str] | None = Field(
        None,
        description="Allowed values when type is 'literal'.",
    )
    verify_with: dict[str, Any] = Field(
        ...,
        description=(
            "Serialized verification primitive. Must include a 'type' key matching a registered primitive name."
        ),
    )
    weight: float = Field(1.0, description="Weight for verify_granular() scoring.")
    is_trace: bool = Field(
        False,
        description=("If True, this field uses a trace primitive and is excluded from judge parsing."),
    )

TemplateSpec

Bases: BaseModel

Complete specification for a VerifiedField answer template.

This is the JSON interchange format between the visual builder GUI and the Python template code. When verify_strategy is None, the default AllOf-all-fields strategy is used.

Source code in src/karenina/schemas/entities/template_spec.py
class TemplateSpec(BaseModel):
    """Complete specification for a VerifiedField answer template.

    This is the JSON interchange format between the visual builder GUI
    and the Python template code. When verify_strategy is None, the
    default AllOf-all-fields strategy is used.
    """

    model_config = ConfigDict(extra="forbid")

    fields: list[TemplateFieldSpec] = Field(..., description="Ordered list of template fields.")
    verify_strategy: VerifyStrategySpec | None = Field(
        None,
        description=("Custom composition strategy. None means default: AllOf with all fields."),
    )
    class_name: str = Field(
        "Answer",
        description="Name for the generated Python class.",
    )

TraceContains

Bases: TracePrimitive

Check for substring in raw LLM response.

Source code in src/karenina/schemas/primitives/trace.py
@_register_primitive
class TraceContains(TracePrimitive):
    """Check for substring in raw LLM response."""

    substring: str

    def check_trace(self, raw_trace: str) -> bool:
        return self.substring in raw_trace

TraceLength

Bases: TracePrimitive

Check length of raw LLM response.

Source code in src/karenina/schemas/primitives/trace.py
@_register_primitive
class TraceLength(TracePrimitive):
    """Check length of raw LLM response."""

    min: int | None = None
    max: int | None = None
    unit: str = "chars"

    def check_trace(self, raw_trace: str) -> bool:
        length = len(raw_trace.split()) if self.unit == "words" else len(raw_trace)
        if self.min is not None and length < self.min:
            return False
        return not (self.max is not None and length > self.max)

TracePrimitive

Bases: VerificationPrimitive

Base class for primitives that operate on raw LLM response text.

Fields using TracePrimitive are excluded from the judge's parsing schema. The pipeline evaluates them directly against the raw response.

Source code in src/karenina/schemas/primitives/trace.py
class TracePrimitive(VerificationPrimitive):
    """Base class for primitives that operate on raw LLM response text.

    Fields using TracePrimitive are excluded from the judge's parsing
    schema. The pipeline evaluates them directly against the raw response.
    """

    def check(self, extracted: Any, expected: Any) -> bool:
        """Not used for trace primitives. Use check_trace() instead."""
        raise NotImplementedError("TracePrimitive uses check_trace(), not check()")

    def check_trace(self, raw_trace: str) -> bool:
        """Evaluate the raw LLM response.

        Args:
            raw_trace: The raw text response from the answering LLM.

        Returns:
            True if the pattern is found/condition is met.
        """
        raise NotImplementedError
Functions
check
check(extracted: Any, expected: Any) -> bool

Not used for trace primitives. Use check_trace() instead.

Source code in src/karenina/schemas/primitives/trace.py
def check(self, extracted: Any, expected: Any) -> bool:
    """Not used for trace primitives. Use check_trace() instead."""
    raise NotImplementedError("TracePrimitive uses check_trace(), not check()")
check_trace
check_trace(raw_trace: str) -> bool

Evaluate the raw LLM response.

Parameters:

Name Type Description Default
raw_trace str

The raw text response from the answering LLM.

required

Returns:

Type Description
bool

True if the pattern is found/condition is met.

Source code in src/karenina/schemas/primitives/trace.py
def check_trace(self, raw_trace: str) -> bool:
    """Evaluate the raw LLM response.

    Args:
        raw_trace: The raw text response from the answering LLM.

    Returns:
        True if the pattern is found/condition is met.
    """
    raise NotImplementedError

TraceRegex

Bases: TracePrimitive

Check for regex pattern in raw LLM response.

Returns True if the pattern is found (or count >= count_min).

Source code in src/karenina/schemas/primitives/trace.py
@_register_primitive
class TraceRegex(TracePrimitive):
    """Check for regex pattern in raw LLM response.

    Returns True if the pattern is found (or count >= count_min).
    """

    pattern: str
    count_min: int | None = None

    def check_trace(self, raw_trace: str) -> bool:
        matches = re.findall(self.pattern, raw_trace)
        if self.count_min is not None:
            return len(matches) >= self.count_min
        return len(matches) > 0

VerificationMeta

Bases: BaseModel

Metadata stored on each VerifiedField, not visible to the judge.

Serialized into json_schema_extra["verification"] on the Pydantic FieldInfo. The prompt builder strips this key before sending the schema to the judge LLM.

Source code in src/karenina/schemas/entities/verified_field.py
class VerificationMeta(BaseModel):
    """Metadata stored on each VerifiedField, not visible to the judge.

    Serialized into json_schema_extra["__verification__"] on the Pydantic
    FieldInfo. The prompt builder strips this key before sending the
    schema to the judge LLM.
    """

    ground_truth: Any
    verify_with: dict[str, Any]  # Serialized primitive (type + params)
    weight: float = 1.0
    extraction_hint: str | None = None

VerificationPrimitive

Bases: BaseModel

Base class for all verification primitives.

Subclasses implement check() to compare an extracted value against an expected value. The primitive type determines whether the field is included in the judge's parsing schema.

Source code in src/karenina/schemas/primitives/comparisons.py
class VerificationPrimitive(BaseModel):
    """Base class for all verification primitives.

    Subclasses implement check() to compare an extracted value against
    an expected value. The primitive type determines whether the field
    is included in the judge's parsing schema.
    """

    def check(self, extracted: Any, expected: Any) -> bool:
        """Compare extracted value against expected value.

        Args:
            extracted: Value extracted by the judge LLM.
            expected: Ground truth value from VerifiedField.

        Returns:
            True if the values match according to this primitive's rules.
        """
        raise NotImplementedError
Functions
check
check(extracted: Any, expected: Any) -> bool

Compare extracted value against expected value.

Parameters:

Name Type Description Default
extracted Any

Value extracted by the judge LLM.

required
expected Any

Ground truth value from VerifiedField.

required

Returns:

Type Description
bool

True if the values match according to this primitive's rules.

Source code in src/karenina/schemas/primitives/comparisons.py
def check(self, extracted: Any, expected: Any) -> bool:
    """Compare extracted value against expected value.

    Args:
        extracted: Value extracted by the judge LLM.
        expected: Ground truth value from VerifiedField.

    Returns:
        True if the values match according to this primitive's rules.
    """
    raise NotImplementedError

VerifyStrategySpec

Bases: BaseModel

Specification for a composition strategy node.

Maps to AllOf, AnyOf, AtLeastN, or FieldCheck in the Python template.

Source code in src/karenina/schemas/entities/template_spec.py
class VerifyStrategySpec(BaseModel):
    """Specification for a composition strategy node.

    Maps to AllOf, AnyOf, AtLeastN, or FieldCheck in the Python template.
    """

    model_config = ConfigDict(extra="forbid")

    type: str = Field(
        ...,
        description="Strategy type: 'all_of', 'any_of', 'at_least_n', 'field_check'.",
    )
    n: int | None = Field(None, description="Required count for 'at_least_n'.")
    field_name: str | None = Field(None, description="Field name for 'field_check' leaves.")
    conditions: list["VerifyStrategySpec"] = Field(
        default_factory=list,
        description="Child conditions for composite strategies.",
    )

Functions

VerifiedField

VerifiedField(
    description: str,
    ground_truth: Any,
    verify_with: Any,
    weight: float = 1.0,
    extraction_hint: str | None = None,
    **kwargs: Any,
) -> Any

Create a Pydantic Field with verification metadata attached.

Unlike plain Field(), description is mandatory because the judge LLM relies on it to know what to extract.

Parameters:

Name Type Description Default
description
str

What to extract (goes into the JSON schema description).

required
ground_truth
Any

Expected correct value.

required
verify_with
Any

Verification primitive instance (ExactMatch, BooleanMatch, etc.).

required
weight
float

Weight for verify_granular() scoring. Default: 1.0.

1.0
extraction_hint
str | None

Optional formatting guidance for the judge.

None
**kwargs
Any

Additional Pydantic Field arguments.

{}

Returns:

Type Description
Any

Pydantic FieldInfo with verification metadata in json_schema_extra.

Raises:

Type Description
ValueError

If verify_with is None or description is empty/whitespace.

Source code in src/karenina/schemas/entities/verified_field.py
def VerifiedField(
    description: str,
    ground_truth: Any,
    verify_with: Any,
    weight: float = 1.0,
    extraction_hint: str | None = None,
    **kwargs: Any,
) -> Any:
    """Create a Pydantic Field with verification metadata attached.

    Unlike plain Field(), description is mandatory because the judge LLM
    relies on it to know what to extract.

    Args:
        description: What to extract (goes into the JSON schema description).
        ground_truth: Expected correct value.
        verify_with: Verification primitive instance (ExactMatch, BooleanMatch, etc.).
        weight: Weight for verify_granular() scoring. Default: 1.0.
        extraction_hint: Optional formatting guidance for the judge.
        **kwargs: Additional Pydantic Field arguments.

    Returns:
        Pydantic FieldInfo with verification metadata in json_schema_extra.

    Raises:
        ValueError: If verify_with is None or description is empty/whitespace.
    """
    # Issue 056: reject empty or whitespace-only description
    if not description or not description.strip():
        raise ValueError(
            "description is required for VerifiedField: the judge LLM relies on it to know what to extract."
        )

    # Issue 053: reject None verify_with with a clear message
    if verify_with is None:
        raise ValueError(
            "verify_with is required: pass a verification primitive instance (e.g., ExactMatch(), BooleanMatch())."
        )

    # Serialize the primitive for storage
    primitive_data = verify_with.model_dump(mode="json")
    primitive_data["type"] = type(verify_with).__name__

    # Issue 010: warn on obvious ground_truth type mismatches
    _warn_ground_truth_mismatch(ground_truth, verify_with)

    meta = VerificationMeta(
        ground_truth=ground_truth,
        verify_with=primitive_data,
        weight=weight,
        extraction_hint=extraction_hint,
    )

    # Merge with any user-provided json_schema_extra
    extra = kwargs.pop("json_schema_extra", None) or {}
    extra["__verification__"] = meta.model_dump(mode="json")

    return Field(description=description, json_schema_extra=extra, **kwargs)

apply_normalizer

apply_normalizer(normalizer: Normalizer, text: str) -> str

Apply a single normalizer to text.

Parameters:

Name Type Description Default
normalizer
Normalizer

A string normalizer name or SynonymMap instance.

required
text
str

The text to normalize.

required

Returns:

Type Description
str

Normalized text.

Raises:

Type Description
ValueError

If normalizer name is not recognized.

Source code in src/karenina/schemas/primitives/normalizers.py
def apply_normalizer(normalizer: Normalizer, text: str) -> str:
    """Apply a single normalizer to text.

    Args:
        normalizer: A string normalizer name or SynonymMap instance.
        text: The text to normalize.

    Returns:
        Normalized text.

    Raises:
        ValueError: If normalizer name is not recognized.
    """
    if isinstance(normalizer, SynonymMap):
        return normalizer.mapping.get(text, text)

    if normalizer == "lowercase":
        return text.lower()
    if normalizer == "strip":
        return text.strip()
    if normalizer == "remove_punctuation":
        return text.translate(str.maketrans("", "", string.punctuation))
    if normalizer == "collapse_whitespace":
        return re.sub(r"\s+", " ", text).strip()

    raise ValueError(f"Unknown normalizer: {normalizer!r}")

apply_normalizers

apply_normalizers(
    normalizers: list[Normalizer], text: str
) -> str

Apply a chain of normalizers in sequence.

Parameters:

Name Type Description Default
normalizers
list[Normalizer]

Ordered list of normalizers to apply.

required
text
str

The text to normalize.

required

Returns:

Type Description
str

Text after all normalizers have been applied.

Source code in src/karenina/schemas/primitives/normalizers.py
def apply_normalizers(normalizers: list[Normalizer], text: str) -> str:
    """Apply a chain of normalizers in sequence.

    Args:
        normalizers: Ordered list of normalizers to apply.
        text: The text to normalize.

    Returns:
        Text after all normalizers have been applied.
    """
    for n in normalizers:
        text = apply_normalizer(n, text)
    return text

capture_answer_source

capture_answer_source(answer_class: type) -> type

Decorator/function to automatically capture source code for Answer classes in notebooks.

Usage as decorator

@capture_answer_source class Answer(BaseAnswer): # your class definition pass

Usage as function

class Answer(BaseAnswer): # your class definition pass Answer = capture_answer_source(Answer)

Parameters:

Name Type Description Default
answer_class
type

The Answer class to capture source for

required

Returns:

Type Description
type

The same class with source code captured

Source code in src/karenina/schemas/entities/answer.py
def capture_answer_source(answer_class: type) -> type:
    """Decorator/function to automatically capture source code for Answer classes in notebooks.

    Usage as decorator:
        @capture_answer_source
        class Answer(BaseAnswer):
            # your class definition
            pass

    Usage as function:
        class Answer(BaseAnswer):
            # your class definition
            pass
        Answer = capture_answer_source(Answer)

    Args:
        answer_class: The Answer class to capture source for

    Returns:
        The same class with source code captured
    """
    if hasattr(answer_class, "set_source_code_from_notebook"):
        answer_class.set_source_code_from_notebook()
    return answer_class

evaluate_strategy

evaluate_strategy(
    node: StrategyNode, field_results: dict[str, bool]
) -> bool

Evaluate a composition strategy tree against field results.

Thin wrapper around evaluate_composition() that supplies the FieldCheck leaf evaluator for the template domain.

Parameters:

Name Type Description Default
node
StrategyNode

The root node of the strategy tree.

required
field_results
dict[str, bool]

Mapping of field names to their pass/fail results.

required

Returns:

Type Description
bool

True if the strategy passes.

Raises:

Type Description
KeyError

If a FieldCheck references a field not in field_results.

Source code in src/karenina/schemas/entities/composition.py
def evaluate_strategy(node: StrategyNode, field_results: dict[str, bool]) -> bool:
    """Evaluate a composition strategy tree against field results.

    Thin wrapper around ``evaluate_composition()`` that supplies the
    FieldCheck leaf evaluator for the template domain.

    Args:
        node: The root node of the strategy tree.
        field_results: Mapping of field names to their pass/fail results.

    Returns:
        True if the strategy passes.

    Raises:
        KeyError: If a FieldCheck references a field not in field_results.
    """

    def _field_check_evaluator(leaf: FieldCheck) -> bool:
        return field_results[leaf.field]

    return evaluate_composition(node, _field_check_evaluator)

merge_dynamic_rubrics

merge_dynamic_rubrics(
    global_dynamic: DynamicRubric | None,
    question_dynamic: DynamicRubric | None,
) -> DynamicRubric | None

Merge global and question-specific dynamic rubrics.

Mirrors :func:merge_rubrics for the dynamic rubric variant. Same-type name collisions are rejected; cross-type overlaps are allowed.

Parameters:

Name Type Description Default
global_dynamic
DynamicRubric | None

The global dynamic rubric (applied to all questions).

required
question_dynamic
DynamicRubric | None

Question-specific dynamic rubric.

required

Returns:

Type Description
DynamicRubric | None

Merged DynamicRubric with traits from both sources, or None if both are None.

Raises:

Type Description
ValueError

If a trait name appears in both rubrics within the same trait type.

Source code in src/karenina/schemas/entities/rubric.py
def merge_dynamic_rubrics(
    global_dynamic: "DynamicRubric | None",
    question_dynamic: "DynamicRubric | None",
) -> "DynamicRubric | None":
    """Merge global and question-specific dynamic rubrics.

    Mirrors :func:`merge_rubrics` for the dynamic rubric variant. Same-type
    name collisions are rejected; cross-type overlaps are allowed.

    Args:
        global_dynamic: The global dynamic rubric (applied to all questions).
        question_dynamic: Question-specific dynamic rubric.

    Returns:
        Merged DynamicRubric with traits from both sources, or None if both are None.

    Raises:
        ValueError: If a trait name appears in both rubrics within the same
            trait type.
    """
    if not global_dynamic and not question_dynamic:
        return None

    if not global_dynamic:
        return question_dynamic

    if not question_dynamic:
        return global_dynamic

    type_pairs: list[tuple[str, list[Any], list[Any]]] = [
        ("llm", global_dynamic.llm_traits, question_dynamic.llm_traits),
        ("regex", global_dynamic.regex_traits, question_dynamic.regex_traits),
        ("callable", global_dynamic.callable_traits, question_dynamic.callable_traits),
        ("metric", global_dynamic.metric_traits, question_dynamic.metric_traits),
        ("agentic", global_dynamic.agentic_traits, question_dynamic.agentic_traits),
    ]
    all_conflicts: list[str] = []
    for type_label, g_traits, q_traits in type_pairs:
        g_names = {t.name for t in g_traits}
        q_names = {t.name for t in q_traits}
        overlap = g_names & q_names
        if overlap:
            all_conflicts.extend(f"{type_label}:{name}" for name in sorted(overlap))

    if all_conflicts:
        raise ValueError(f"Same-type trait name conflicts between global and question dynamic rubrics: {all_conflicts}")

    return DynamicRubric(
        llm_traits=list(global_dynamic.llm_traits) + list(question_dynamic.llm_traits),
        regex_traits=list(global_dynamic.regex_traits) + list(question_dynamic.regex_traits),
        callable_traits=list(global_dynamic.callable_traits) + list(question_dynamic.callable_traits),
        metric_traits=list(global_dynamic.metric_traits) + list(question_dynamic.metric_traits),
        agentic_traits=list(global_dynamic.agentic_traits) + list(question_dynamic.agentic_traits),
    )

merge_rubrics

merge_rubrics(
    global_rubric: Rubric | None,
    question_rubric: Rubric | None,
) -> Rubric | None

Merge global and question-specific rubrics.

Same-type trait name collisions (e.g., both rubrics have an LLM trait named "safety") raise ValueError. Cross-type collisions (e.g., global regex trait "quality" + question LLM trait "quality") are allowed because results are stored in type-segregated dicts.

Parameters:

Name Type Description Default
global_rubric
Rubric | None

The global rubric (applied to all questions).

required
question_rubric
Rubric | None

Question-specific rubric (adds to global).

required

Returns:

Type Description
Rubric | None

Merged rubric, or None if both are None.

Raises:

Type Description
ValueError

If a trait name appears in both rubrics within the same trait type.

Source code in src/karenina/schemas/entities/rubric.py
def merge_rubrics(global_rubric: "Rubric | None", question_rubric: "Rubric | None") -> "Rubric | None":
    """Merge global and question-specific rubrics.

    Same-type trait name collisions (e.g., both rubrics have an LLM trait
    named "safety") raise ``ValueError``. Cross-type collisions (e.g., global
    regex trait "quality" + question LLM trait "quality") are allowed because
    results are stored in type-segregated dicts.

    Args:
        global_rubric: The global rubric (applied to all questions).
        question_rubric: Question-specific rubric (adds to global).

    Returns:
        Merged rubric, or None if both are None.

    Raises:
        ValueError: If a trait name appears in both rubrics within the same
            trait type.
    """
    if not global_rubric and not question_rubric:
        return None

    if not global_rubric:
        return question_rubric

    if not question_rubric:
        return global_rubric

    # Check per-type name collisions
    type_pairs: list[tuple[str, list[Any], list[Any]]] = [
        ("llm", global_rubric.llm_traits, question_rubric.llm_traits),
        ("regex", global_rubric.regex_traits, question_rubric.regex_traits),
        ("callable", global_rubric.callable_traits, question_rubric.callable_traits),
        ("metric", global_rubric.metric_traits, question_rubric.metric_traits),
        ("agentic", global_rubric.agentic_traits, question_rubric.agentic_traits),
    ]
    all_conflicts: list[str] = []
    for type_label, g_traits, q_traits in type_pairs:
        g_names = {t.name for t in g_traits}
        q_names = {t.name for t in q_traits}
        overlap = g_names & q_names
        if overlap:
            all_conflicts.extend(f"{type_label}:{name}" for name in sorted(overlap))

    if all_conflicts:
        raise ValueError(f"Same-type trait name conflicts between global and question rubrics: {all_conflicts}")

    return Rubric(
        llm_traits=list(global_rubric.llm_traits) + list(question_rubric.llm_traits),
        regex_traits=list(global_rubric.regex_traits) + list(question_rubric.regex_traits),
        callable_traits=list(global_rubric.callable_traits) + list(question_rubric.callable_traits),
        metric_traits=list(global_rubric.metric_traits) + list(question_rubric.metric_traits),
        agentic_traits=list(global_rubric.agentic_traits) + list(question_rubric.agentic_traits),
    )