Skip to content

karenina.benchmark

benchmark

Benchmark module for Karenina verification system.

Classes

Benchmark

Main class for managing Karenina benchmarks in JSON-LD format.

This class provides a high-level API for: - Creating benchmarks manually or automatically - Loading/saving JSON-LD benchmark files - Running verification with existing execution system - Full compatibility with frontend GUI exports

This is a facade that delegates to specialized manager classes for better maintainability.

Source code in src/karenina/benchmark/benchmark.py
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
class Benchmark:
    """
    Main class for managing Karenina benchmarks in JSON-LD format.

    This class provides a high-level API for:
    - Creating benchmarks manually or automatically
    - Loading/saving JSON-LD benchmark files
    - Running verification with existing execution system
    - Full compatibility with frontend GUI exports

    This is a facade that delegates to specialized manager classes for better maintainability.
    """

    def __init__(
        self,
        name: str,
        description: str = "",
        version: str = "0.1.0",
        creator: str = "Karenina Benchmarking System",
        workspace_root: Path | None = None,
    ):
        """
        Initialize a new benchmark.

        Args:
            name: Name of the benchmark
            description: Description of the benchmark
            version: Version of the benchmark content
            creator: Creator name or organization
            workspace_root: Root directory containing task workspaces.
                Question workspace paths are resolved relative to this root.
                Not persisted in the checkpoint (it is a local filesystem path).
        """
        self._base = BenchmarkBase(name, description, version, creator)
        self._workspace_root = workspace_root
        self._scenarios: dict[str, ScenarioDefinition] = {}
        self._metadata_manager = MetadataManager(self._base)
        self._question_manager = QuestionManager(self._base)
        self._rubric_manager = RubricManager(self._base)
        self._template_manager = TemplateManager(self._base)
        self._results_manager = ResultsManager(self._base)
        self._verification_manager = VerificationManager(self._base, self._rubric_manager)
        self._export_manager = ExportManager(self._base, self._template_manager, self._rubric_manager)

    def _init_managers(self) -> None:
        """Initialize all managers from self._base (used by load/clone)."""
        if not hasattr(self, "_scenarios"):
            self._scenarios = {}
        self._metadata_manager = MetadataManager(self._base)
        self._question_manager = QuestionManager(self._base)
        self._rubric_manager = RubricManager(self._base)
        self._template_manager = TemplateManager(self._base)
        self._results_manager = ResultsManager(self._base)
        self._verification_manager = VerificationManager(self._base, self._rubric_manager)
        self._export_manager = ExportManager(self._base, self._template_manager, self._rubric_manager)
        self._rebuild_scenarios()

    def _rebuild_scenarios(self) -> None:
        """Rebuild _scenarios cache from checkpoint hasPart data."""
        from ..scenario.checkpoint import schema_org_to_scenario

        has_part = self._base._checkpoint.hasPart
        if not has_part:
            return

        # Validate homogeneity: cannot have both questions and scenarios
        if self._base._questions_cache:
            raise ValueError(
                "Checkpoint contains both questions and scenarios; this is not supported. "
                "A benchmark must contain either standalone questions or scenarios, not both."
            )

        for schema_org in has_part:
            defn = schema_org_to_scenario(schema_org)
            self._scenarios[defn.name] = defn

    @property
    def workspace_root(self) -> Path | None:
        """Root directory for task workspaces (not persisted in checkpoint)."""
        return self._workspace_root

    def set_workspace_root(self, path: Path) -> None:
        """Set the root directory for task workspaces.

        Args:
            path: Directory containing task workspace subdirectories.
                Question workspace paths are resolved relative to this root.
        """
        self._workspace_root = path

    @classmethod
    def create(
        cls,
        name: str,
        description: str = "",
        version: str = "0.1.0",
        creator: str = "Karenina Benchmarking System",
        workspace_root: Path | None = None,
    ) -> "Benchmark":
        """Create a new benchmark (alias for constructor)."""
        return cls(name, description, version, creator, workspace_root=workspace_root)

    @classmethod
    def load(cls, path: Path, workspace_root: Path | None = None) -> "Benchmark":
        """Load a benchmark from a JSON-LD file.

        Args:
            path: Path to the JSON-LD benchmark file.
            workspace_root: Optional root directory for task workspaces.
        """
        base = BenchmarkBase.load(path)
        instance = cls.__new__(cls)
        instance._base = base
        instance._workspace_root = workspace_root
        instance._init_managers()
        return instance

    def save(self, path: Path, save_deep_judgment_config: bool = False) -> None:
        """Save the benchmark to a JSON-LD file.

        Args:
            path: Path where to save the benchmark.
            save_deep_judgment_config: If True, include deep judgment
                configuration in LLM rubric traits. If False (default),
                deep judgment settings are stripped before saving.
        """
        self._base.save(path, save_deep_judgment_config=save_deep_judgment_config)

    def save_to_db(self, storage: str, checkpoint_path: Path | None = None) -> "Benchmark":
        """Save this benchmark to a database."""
        from typing import cast

        from ..storage import save_benchmark

        result = save_benchmark(self, storage, checkpoint_path)
        return cast("Benchmark", result)

    @classmethod
    def load_from_db(cls, benchmark_name: str, storage: str) -> "Benchmark":
        """Load a benchmark from a database."""
        from ..storage import load_benchmark

        result = load_benchmark(benchmark_name, storage, load_config=False)
        return result  # type: ignore[return-value]

    # ── Question management ──────────────────────────────────────────────

    def add_question(
        self,
        question: Union[str, dict[str, Any], "Question"],
        raw_answer: str | None = None,
        answer_template: str | type | None = None,
        question_id: str | None = None,
        finished: bool | object = _NOT_PROVIDED,
        author: dict[str, Any] | None = None,
        sources: list[dict[str, Any]] | None = None,
        custom_metadata: dict[str, Any] | None = None,
        few_shot_examples: list[dict[str, str]] | None = None,
        answer_notes: str | None = None,
    ) -> str:
        """Add a question to the benchmark.

        Accepts a question string, a Question object, or a dict with keys
        ``question`` and ``raw_answer`` (plus any optional kwargs).

        Raises:
            ValueError: If scenarios already exist (homogeneous enforcement).
        """
        if self._scenarios:
            raise ValueError(
                "Cannot add standalone questions to a scenario benchmark. "
                "Scenarios and standalone questions cannot coexist in the same benchmark."
            )
        return self._question_manager.add_question(
            question,
            raw_answer,
            answer_template,
            question_id,
            finished,
            author,
            sources,
            custom_metadata,
            few_shot_examples,
            answer_notes=answer_notes,
        )

    def add_questions(self, questions_data: list[dict[str, Any]]) -> list[str]:
        """Add multiple questions at once.

        Each dict is passed to ``add_question()``, so all dict keys supported
        there are accepted here.

        Args:
            questions_data: List of dicts with question data.

        Returns:
            List of question IDs that were created.

        Raises:
            ValueError: If scenarios already exist (homogeneous enforcement).
        """
        if self._scenarios:
            raise ValueError(
                "Cannot add standalone questions to a scenario benchmark. "
                "Scenarios and standalone questions cannot coexist in the same benchmark."
            )
        return self._question_manager.add_questions(questions_data)

    def get_question_ids(self) -> list[str]:
        """Get all question IDs in the benchmark."""
        return self._question_manager.get_question_ids()

    def get_question(self, question_id: str) -> dict[str, Any]:
        """Get a question by ID."""
        return self._question_manager.get_question(question_id)

    def get_all_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
        """Get all questions in the benchmark."""
        return self._question_manager.get_all_questions(ids_only)

    def get_question_as_object(self, question_id: str) -> "Question":
        """Get a question as a Question object."""
        return self._question_manager.get_question_as_object(question_id)

    def get_all_questions_as_objects(self) -> list["Question"]:
        """Get all questions as Question objects."""
        return self._question_manager.get_all_questions_as_objects()

    def add_question_from_object(self, question_obj: "Question", **metadata: Any) -> str:
        """Add a question to the benchmark from a Question object."""
        return self._question_manager.add_question_from_object(question_obj, **metadata)

    def update_question_metadata(self, question_id: str, **metadata: Any) -> None:
        """Update question metadata fields."""
        self._question_manager.update_question_metadata(question_id, **metadata)

    def get_question_metadata(self, question_id: str) -> dict[str, Any]:
        """Get all metadata for a specific question."""
        return self._question_manager.get_question_metadata(question_id)

    def get_question_custom_property(self, question_id: str, name: str) -> Any:
        """Get a custom property from question metadata."""
        return self._question_manager.get_question_custom_property(question_id, name)

    def set_question_custom_property(self, question_id: str, name: str, value: Any) -> None:
        """Set a custom property on question metadata."""
        self._question_manager.set_question_custom_property(question_id, name, value)

    def remove_question_custom_property(self, question_id: str, name: str) -> bool:
        """Remove a custom property from question metadata."""
        return self._question_manager.remove_question_custom_property(question_id, name)

    def get_question_author(self, question_id: str) -> dict[str, Any] | None:
        """Get author information for a question."""
        return self._question_manager.get_question_author(question_id)

    def set_question_author(self, question_id: str, author: dict[str, Any] | None) -> None:
        """Set author information for a question."""
        self._question_manager.set_question_author(question_id, author)

    def get_question_sources(self, question_id: str) -> list[dict[str, Any]] | None:
        """Get source documents for a question."""
        return self._question_manager.get_question_sources(question_id)

    def set_question_sources(self, question_id: str, sources: list[dict[str, Any]] | None) -> None:
        """Set source documents for a question."""
        self._question_manager.set_question_sources(question_id, sources)

    def get_question_timestamps(self, question_id: str) -> dict[str, str]:
        """Get creation and modification timestamps for a question."""
        return self._question_manager.get_question_timestamps(question_id)

    def remove_question(self, question_id: str) -> bool:
        """Remove a specific question from the benchmark."""
        return self._question_manager.remove_question(question_id)

    def clear_questions(self) -> int:
        """Remove all questions from the benchmark."""
        return self._question_manager.clear_questions()

    def add_questions_batch(self, questions_data: list[dict[str, Any]]) -> list[str]:
        """Add multiple questions at once."""
        return self._question_manager.add_questions_batch(questions_data)

    def mark_finished(self, question_id: str) -> None:
        """Mark a question as finished."""
        self._question_manager.mark_finished(question_id)

    def mark_unfinished(self, question_id: str) -> None:
        """Mark a question as unfinished."""
        self._question_manager.mark_unfinished(question_id)

    def mark_finished_batch(self, question_ids: list[str]) -> None:
        """Mark multiple questions as finished."""
        self._question_manager.mark_finished_batch(question_ids)

    def mark_unfinished_batch(self, question_ids: list[str]) -> None:
        """Mark multiple questions as unfinished."""
        self._question_manager.mark_unfinished_batch(question_ids)

    def toggle_finished(self, question_id: str) -> bool:
        """Toggle finished status of a question."""
        return self._question_manager.toggle_finished(question_id)

    def get_unfinished_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
        """Get questions that are not marked as finished."""
        return self._question_manager.get_unfinished_questions(ids_only)

    def get_finished_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
        """Get questions that are marked as finished."""
        return self._question_manager.get_finished_questions(ids_only)

    def filter_questions(
        self,
        finished: bool | None = None,
        has_template: bool | None = None,
        has_rubric: bool | None = None,
        author: str | None = None,
        custom_filter: Any = None,
    ) -> list[dict[str, Any]]:
        """Filter questions based on criteria."""
        return self._question_manager.filter_questions(finished, has_template, has_rubric, author, custom_filter)

    def filter_by_metadata(self, field_path: str, value: Any, match_mode: str = "exact") -> list[dict[str, Any]]:
        """Filter questions by a metadata field using dot notation."""
        return self._question_manager.filter_by_metadata(field_path, value, match_mode)

    def filter_by_custom_metadata(self, match_all: bool = True, **criteria: Any) -> list[dict[str, Any]]:
        """Filter questions by custom metadata fields with AND/OR logic."""
        return self._question_manager.filter_by_custom_metadata(match_all, **criteria)

    def search_questions(
        self,
        query: str | list[str],
        match_all: bool = True,
        fields: list[str] | None = None,
        case_sensitive: bool = False,
        regex: bool = False,
    ) -> list[dict[str, Any]]:
        """Search for questions containing the query text (unified search method)."""
        return self._question_manager.search_questions(query, match_all, fields, case_sensitive, regex)

    def get_questions_by_author(self, author: str) -> list[dict[str, Any]]:
        """Get questions created by a specific author."""
        return self._question_manager.get_questions_by_author(author)

    def get_questions_with_rubric(self) -> list[dict[str, Any]]:
        """Get questions that have question-specific rubrics."""
        return self._question_manager.get_questions_with_rubric()

    def count_by_field(self, field_path: str, questions: list[dict[str, Any]] | None = None) -> dict[Any, int]:
        """Count questions grouped by a field value using dot notation."""
        return self._question_manager.count_by_field(field_path, questions)

    # ── Scenario management ─────────────────────────────────────────────

    @property
    def is_scenario_benchmark(self) -> bool:
        """True if this benchmark contains scenarios instead of standalone questions."""
        return len(self._scenarios) > 0

    @property
    def scenario_count(self) -> int:
        """Return the number of scenarios in the benchmark."""
        return len(self._scenarios)

    def add_scenario(self, scenario: "ScenarioDefinition | Any") -> None:
        """Add a scenario to the benchmark.

        Accepts either a ScenarioDefinition (frozen) or a Scenario builder
        (which will be validated and frozen automatically).

        Args:
            scenario: A ScenarioDefinition or a Scenario builder instance.

        Raises:
            ValueError: If standalone questions already exist (homogeneous enforcement),
                or if a scenario with the same name already exists.
        """
        if self._base._questions_cache:
            raise ValueError(
                "Cannot add scenarios to a benchmark that already contains standalone questions. "
                "Scenarios and standalone questions cannot coexist in the same benchmark."
            )

        # Accept Scenario builder: call validate() to get a ScenarioDefinition
        if not isinstance(scenario, ScenarioDefinition):
            scenario = scenario.validate()

        if scenario.name in self._scenarios:
            raise ValueError(f"Scenario '{scenario.name}' already exists")

        self._scenarios[scenario.name] = scenario

        # Write to checkpoint (checkpoint is source of truth)
        from ..scenario.checkpoint import scenario_to_schema_org
        from ..schemas.checkpoint import SchemaOrgPropertyValue

        schema_org = scenario_to_schema_org(scenario)
        if self._base._checkpoint.hasPart is None:
            self._base._checkpoint.hasPart = []
        self._base._checkpoint.hasPart.append(schema_org)

        # Set benchmark_type flag (once)
        props = self._base._checkpoint.additionalProperty or []
        if not any(p.name == "benchmark_type" for p in props):
            if self._base._checkpoint.additionalProperty is None:
                self._base._checkpoint.additionalProperty = []
            self._base._checkpoint.additionalProperty.append(
                SchemaOrgPropertyValue(name="benchmark_type", value="scenario")
            )

    def get_scenarios(self) -> list[ScenarioDefinition]:
        """Get all scenario definitions.

        Returns:
            List of ScenarioDefinition instances.
        """
        return list(self._scenarios.values())

    def get_scenario(self, name: str) -> ScenarioDefinition:
        """Get a scenario by name.

        Args:
            name: The scenario name.

        Returns:
            The ScenarioDefinition.

        Raises:
            KeyError: If no scenario with that name exists.
        """
        try:
            return self._scenarios[name]
        except KeyError:
            raise KeyError(f"Scenario '{name}' not found") from None

    def remove_scenario(self, name: str) -> None:
        """Remove a scenario by name.

        Args:
            name: The scenario name.

        Raises:
            KeyError: If no scenario with that name exists.
        """
        try:
            del self._scenarios[name]
        except KeyError:
            raise KeyError(f"Scenario '{name}' not found") from None

        # Remove from checkpoint
        if self._base._checkpoint.hasPart:
            self._base._checkpoint.hasPart = [s for s in self._base._checkpoint.hasPart if s.name != name]
            if not self._base._checkpoint.hasPart:
                self._base._checkpoint.hasPart = None
                # Clear benchmark_type flag when no scenarios remain
                if self._base._checkpoint.additionalProperty:
                    self._base._checkpoint.additionalProperty = [
                        p for p in self._base._checkpoint.additionalProperty if p.name != "benchmark_type"
                    ]

    # ── Template management ──────────────────────────────────────────────

    def add_answer_template(self, question_id: str, template_code: str) -> None:
        """Add or update an answer template for a question."""
        self._template_manager.add_answer_template(question_id, template_code)

    def has_template(self, question_id: str) -> bool:
        """Check if a question has a non-default template."""
        return self._template_manager.has_template(question_id)

    def get_template(self, question_id: str) -> str:
        """Get template code for a question."""
        return self._template_manager.get_template(question_id)

    def update_template(self, question_id: str, template_code: str | type) -> None:
        """Update existing template.

        Args:
            question_id: The question ID
            template_code: Python code defining the Answer class, or a BaseAnswer subclass
        """
        self._template_manager.update_template(question_id, template_code)

    def copy_template(self, from_id: str, to_id: str) -> None:
        """Copy template from one question to another."""
        self._template_manager.copy_template(from_id, to_id)

    def get_finished_templates(self, question_ids: set[str] | None = None) -> list[FinishedTemplate]:
        """Get all finished templates for verification."""
        return self._template_manager.get_finished_templates(question_ids=question_ids)

    def get_missing_templates(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
        """Get questions that don't have non-default templates."""
        return self._template_manager.get_missing_templates(ids_only)

    def apply_global_template(self, template_code: str) -> list[str]:
        """Apply a template to all questions that don't have one."""
        return self._template_manager.apply_global_template(template_code)

    def validate_templates(self) -> tuple[bool, list[dict[str, str]]]:
        """Validate all templates are valid Python code."""
        return self._template_manager.validate_templates()

    # ── Template generation (delegated to benchmark_helpers) ─────────────

    def generate_template_for_question(
        self,
        question_id: str,
        model: str = "gemini-2.0-flash",
        model_provider: str = "google_genai",
        temperature: float = 0,
        interface: str = "langchain",
        force_regenerate: bool = False,
        endpoint_base_url: str | None = None,
        endpoint_api_key: str | None = None,
    ) -> dict[str, Any]:
        """Generate an answer template for a specific question using LLM."""
        return _helpers.generate_template_for_question(
            self,
            question_id,
            model,
            model_provider,
            temperature,
            interface,
            force_regenerate,
            endpoint_base_url,
            endpoint_api_key,
        )

    def generate_templates(
        self,
        question_ids: list[str],
        model: str = "gemini-2.0-flash",
        model_provider: str = "google_genai",
        temperature: float = 0,
        interface: str = "langchain",
        force_regenerate: bool = False,
        progress_callback: Callable[[float, str], None] | None = None,
        endpoint_base_url: str | None = None,
        endpoint_api_key: str | None = None,
    ) -> dict[str, dict[str, Any]]:
        """Generate templates for multiple questions using LLM."""
        return _helpers.generate_templates(
            self,
            question_ids,
            model,
            model_provider,
            temperature,
            interface,
            force_regenerate,
            progress_callback,
            endpoint_base_url,
            endpoint_api_key,
        )

    def generate_all_templates(
        self,
        model: str = "gemini-2.0-flash",
        model_provider: str = "google_genai",
        temperature: float = 0,
        interface: str = "langchain",
        force_regenerate: bool = False,
        progress_callback: Callable[[float, str], None] | None = None,
        only_missing: bool = True,
        endpoint_base_url: str | None = None,
        endpoint_api_key: str | None = None,
    ) -> dict[str, dict[str, Any]]:
        """Generate templates for all questions in the benchmark using LLM."""
        return _helpers.generate_all_templates(
            self,
            model,
            model_provider,
            temperature,
            interface,
            force_regenerate,
            progress_callback,
            only_missing,
            endpoint_base_url,
            endpoint_api_key,
        )

    def export_generated_templates(self, file_path: Path) -> None:
        """Export all generated templates to a JSON file."""
        _helpers.export_generated_templates(self, file_path)

    def import_generated_templates(self, file_path: Path, force_overwrite: bool = False) -> dict[str, bool]:
        """Import templates from a JSON file generated by export_generated_templates."""
        return _helpers.import_generated_templates(self, file_path, force_overwrite)

    # ── Rubric management ────────────────────────────────────────────────

    def add_global_rubric_trait(
        self, trait: LLMRubricTrait | RegexRubricTrait | CallableRubricTrait | MetricRubricTrait | AgenticRubricTrait
    ) -> None:
        """Add a global rubric trait to the benchmark."""
        self._rubric_manager.add_global_rubric_trait(trait)

    def add_question_rubric_trait(
        self,
        question_id: str,
        trait: LLMRubricTrait | RegexRubricTrait | CallableRubricTrait | MetricRubricTrait | AgenticRubricTrait,
    ) -> None:
        """Add a question-specific rubric trait."""
        self._rubric_manager.add_question_rubric_trait(question_id, trait)

    def set_global_rubric(self, rubric: Rubric) -> None:
        """Set the complete global rubric (replaces existing)."""
        self.clear_global_rubric()
        for trait in rubric.llm_traits:
            self.add_global_rubric_trait(trait)
        for regex_trait in rubric.regex_traits:
            self.add_global_rubric_trait(regex_trait)
        for callable_trait in rubric.callable_traits:
            self.add_global_rubric_trait(callable_trait)
        for metric_trait in rubric.metric_traits:
            self.add_global_rubric_trait(metric_trait)
        for agentic_trait in rubric.agentic_traits:
            self.add_global_rubric_trait(agentic_trait)

    def set_question_rubric(self, question_id: str, rubric: Rubric) -> None:
        """Set the complete question-specific rubric (replaces existing)."""
        self.remove_question_rubric(question_id)
        for trait in rubric.llm_traits:
            self.add_question_rubric_trait(question_id, trait)
        for regex_trait in rubric.regex_traits:
            self.add_question_rubric_trait(question_id, regex_trait)
        for callable_trait in rubric.callable_traits:
            self.add_question_rubric_trait(question_id, callable_trait)
        for metric_trait in rubric.metric_traits:
            self.add_question_rubric_trait(question_id, metric_trait)
        for agentic_trait in rubric.agentic_traits:
            self.add_question_rubric_trait(question_id, agentic_trait)

    def get_global_rubric(self) -> Rubric | None:
        """Get the global rubric from the benchmark."""
        return self._rubric_manager.get_global_rubric()

    def clear_global_rubric(self) -> bool:
        """Remove the global rubric."""
        return self._rubric_manager.clear_global_rubric()

    def remove_question_rubric(self, question_id: str) -> bool:
        """Remove question-specific rubric."""
        return self._rubric_manager.remove_question_rubric(question_id)

    def clear_all_rubrics(self) -> int:
        """Remove all rubrics (global and question-specific)."""
        return self._rubric_manager.clear_all_rubrics()

    def validate_rubrics(self) -> tuple[bool, list[str]]:
        """Validate all rubrics are properly configured."""
        return self._rubric_manager.validate_rubrics()

    # ── Dynamic rubric management ──────────────────────────────────────

    def get_global_dynamic_rubric(self) -> DynamicRubric | None:
        """Get the global dynamic rubric from the benchmark."""
        return self._rubric_manager.get_global_dynamic_rubric()

    def set_global_dynamic_rubric(self, dynamic_rubric: DynamicRubric | None) -> None:
        """Set or clear the global dynamic rubric.

        Persists the rubric to the checkpoint so it survives save/load cycles.

        Args:
            dynamic_rubric: The DynamicRubric to set, or None to clear.
        """
        self._base._global_dynamic_rubric = dynamic_rubric
        if dynamic_rubric is not None:
            self._rubric_manager.set_global_dynamic_rubric_in_checkpoint(dynamic_rubric)
        else:
            # Clear from checkpoint: remove dynamic rubric ratings
            if self._base._checkpoint.rating:
                self._base._checkpoint.rating = [
                    r for r in self._base._checkpoint.rating if r.additionalType != "karenina:GlobalDynamicRubricTrait"
                ]

    def get_merged_dynamic_rubric_for_question(self, question_id: str) -> DynamicRubric | None:
        """Get merged dynamic rubric for a question (global + question-specific).

        Args:
            question_id: The question ID.

        Returns:
            Merged DynamicRubric or None if neither global nor question-level exists.
        """
        return self._rubric_manager.get_merged_dynamic_rubric_for_question(question_id)

    # ── Verification ─────────────────────────────────────────────────────

    def run_verification(
        self,
        config: VerificationConfig,
        question_ids: list[str] | None = None,
        run_name: str | None = None,
        async_enabled: bool | None = None,
        progress_callback: Callable[[float, str], None] | None = None,
    ) -> VerificationResultSet:
        """Run verification on the benchmark using existing execution system.

        For scenario benchmarks, dispatches to ``_run_scenario_verification``
        which iterates over the scenario x model cross-product.
        For standalone question benchmarks, delegates to VerificationManager.
        """
        if self.is_scenario_benchmark:
            return self._run_scenario_verification(
                config=config,
                run_name=run_name,
                async_enabled=async_enabled,
                progress_callback=progress_callback,
            )
        return self._verification_manager.run_verification(
            config,
            question_ids,
            run_name,
            async_enabled,
            progress_callback,
            workspace_root=self._workspace_root,
        )

    def _run_scenario_verification(
        self,
        config: VerificationConfig,
        run_name: str | None = None,
        async_enabled: bool | None = None,
        progress_callback: Callable[[float, str], None] | None = None,
    ) -> VerificationResultSet:
        """Run verification for scenario benchmarks.

        Creates a ScenarioManager and iterates over the cross-product of
        scenarios, answering models, and parsing models. When ``async_enabled``
        is True and there are multiple task combinations, uses
        ``asyncio.gather`` with ``manager.arun()`` for parallel execution.

        Args:
            config: Verification configuration.
            run_name: Optional run name for tracking.
            async_enabled: If True, run combinations in parallel via asyncio.
            progress_callback: Optional callback for progress updates.

        Returns:
            VerificationResultSet containing all per-turn results.
        """
        from ..scenario.manager import ScenarioManager

        manager = ScenarioManager()
        global_rubric = self._rubric_manager.get_global_rubric()
        all_results: list[VerificationResult] = []
        all_scenario_results: list[Any] = []
        all_errors: list[tuple[str, BaseException]] = []

        # Build the list of (scenario, answering_model, parsing_model) combos
        combos = [
            (scenario_def, ans_model, parse_model)
            for scenario_def in self._scenarios.values()
            for ans_model in config.answering_models
            for parse_model in config.parsing_models
        ]

        if async_enabled and len(combos) > 1:
            parallel_results, parallel_exec_results, parallel_errors = self._run_scenario_parallel(
                manager=manager,
                combos=combos,
                config=config,
                run_name=run_name,
                global_rubric=global_rubric,
                progress_callback=progress_callback,
            )
            all_results = parallel_results
            all_scenario_results = parallel_exec_results
            all_errors = parallel_errors
        else:
            for scenario_def, ans_model, parse_model in combos:
                exec_result = manager.run(
                    scenario=scenario_def,
                    config=config,
                    base_answering_model=ans_model,
                    base_parsing_model=parse_model,
                    run_name=run_name,
                    global_rubric=global_rubric,
                    progress_callback=progress_callback,
                )
                all_results.extend(exec_result.turn_results)
                all_scenario_results.append(exec_result)

        return VerificationResultSet(
            results=all_results,
            scenario_results=all_scenario_results if all_scenario_results else None,
            errors=all_errors if all_errors else None,
        )

    def _run_scenario_parallel(
        self,
        manager: Any,
        combos: list[tuple[Any, Any, Any]],
        config: VerificationConfig,
        run_name: str | None,
        global_rubric: "Rubric | None",
        progress_callback: Callable[..., None] | None,
    ) -> tuple[list[VerificationResult], list[Any], list[tuple[str, BaseException]]]:
        """Run scenario combinations in parallel via asyncio.gather.

        Args:
            manager: ScenarioManager instance.
            combos: List of (scenario, answering_model, parsing_model) tuples.
            config: Verification configuration.
            run_name: Optional run name.
            global_rubric: Optional global rubric.
            progress_callback: Optional progress callback.

        Returns:
            Tuple of (turn_results, scenario_exec_results, errors).
        """
        import asyncio

        async def _gather() -> tuple[list[VerificationResult], list[Any], list[tuple[str, BaseException]]]:
            coros = [
                manager.arun(
                    scenario=scenario_def,
                    config=config,
                    base_answering_model=ans_model,
                    base_parsing_model=parse_model,
                    run_name=run_name,
                    global_rubric=global_rubric,
                    progress_callback=progress_callback,
                )
                for scenario_def, ans_model, parse_model in combos
            ]
            exec_results = await asyncio.gather(*coros, return_exceptions=True)
            results: list[VerificationResult] = []
            scenario_exec_results: list[Any] = []
            errors: list[tuple[str, BaseException]] = []
            for i, er in enumerate(exec_results):
                if isinstance(er, BaseException):
                    combo = combos[i]
                    desc = f"Scenario '{combo[0].name}' with {combo[1].model_name}/{combo[2].model_name}"
                    logger.error(
                        "Scenario execution failed: %s: %s",
                        desc,
                        er,
                    )
                    errors.append((desc, er))
                    continue
                results.extend(er.turn_results)
                scenario_exec_results.append(er)
            return results, scenario_exec_results, errors

        # If there is already a running event loop, run in a thread
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop is not None:
            from concurrent.futures import ThreadPoolExecutor

            with ThreadPoolExecutor(max_workers=1) as pool:
                future = pool.submit(asyncio.run, _gather())
                return future.result()
        else:
            return asyncio.run(_gather())

    # ── Results management ───────────────────────────────────────────────

    def store_verification_results(
        self,
        results: VerificationResultSet | dict[str, VerificationResult],
        run_name: str | None = None,
    ) -> None:
        """Store verification results in the benchmark metadata."""
        _helpers.store_verification_results(self, results, run_name)

    def get_verification_results(
        self,
        question_ids: list[str] | None = None,
        run_name: str | None = None,
    ) -> dict[str, VerificationResult]:
        """Get verification results for specific questions and/or runs."""
        return self._results_manager.get_verification_results(question_ids, run_name)

    def get_verification_history(self, question_id: str | None = None) -> dict[str, dict[str, VerificationResult]]:
        """Get verification history organized by run name."""
        return self._results_manager.get_verification_history(question_id)

    def clear_verification_results(
        self,
        question_ids: list[str] | None = None,
        run_name: str | None = None,
    ) -> int:
        """Clear verification results."""
        return self._results_manager.clear_verification_results(question_ids, run_name)

    def export_verification_results(
        self,
        question_ids: list[str] | None = None,
        run_name: str | None = None,
        format: str = "json",
        global_rubric: "Rubric | None" = None,
    ) -> str:
        """Export verification results in specified format."""
        return self._results_manager.export_verification_results(question_ids, run_name, format, global_rubric)

    def export_verification_results_to_file(
        self,
        file_path: Path,
        question_ids: list[str] | None = None,
        run_name: str | None = None,
        format: str | None = None,
        global_rubric: "Rubric | None" = None,
    ) -> None:
        """Export verification results directly to a file."""
        self._results_manager.export_results_to_file(file_path, question_ids, run_name, format, global_rubric)

    def load_verification_results_from_file(
        self,
        file_path: Path,
        run_name: str | None = None,
    ) -> dict[str, VerificationResult]:
        """Load verification results from a previously exported file."""
        return self._results_manager.load_results_from_file(file_path, run_name)

    def get_verification_summary(self, run_name: str | None = None) -> dict[str, Any]:
        """Get summary statistics for verification results."""
        return self._results_manager.get_verification_summary(run_name)

    def get_all_run_names(self) -> list[str]:
        """Get all verification run names."""
        return self._results_manager.get_all_run_names()

    def get_results_statistics_by_run(self) -> dict[str, dict[str, Any]]:
        """Get verification statistics for each run."""
        return self._results_manager.get_results_statistics_by_run()

    # ── GEPA optimization (delegated to benchmark_helpers) ───────────────

    def optimize(
        self,
        targets: list[str],
        config: VerificationConfig | None = None,
        train_ratio: float = 0.8,
        val_ratio: float = 0.2,
        test_ratio: float | None = None,
        seed: int | None = None,
        reflection_model: str = "openai/gpt-4o",
        max_metric_calls: int = 150,
        objective_config: "ObjectiveConfig | None" = None,
        frontier_type: "FrontierType" = "objective",
        seed_prompts: dict[str, str] | None = None,
        tracker_path: Path | str | None = None,
        export_preset_path: Path | str | None = None,
        progress_callback: Callable[[float, str], None] | None = None,
        verbose: bool = False,
    ) -> "KareninaOutput":
        """
        Optimize text components using GEPA with karenina verification as the metric.

        Requires the 'gepa' optional dependency: pip install karenina[gepa]

        Args:
            targets: List of components to optimize. Valid values:
                     "answering_system_prompt", "parsing_instructions", "mcp_tool_descriptions"
            config: Base VerificationConfig to use. If None, uses default minimal config.
            train_ratio: Fraction of questions for training (default 0.8)
            val_ratio: Fraction of questions for validation (default 0.2)
            test_ratio: Optional fraction for testing. If None, no test set created.
            seed: Random seed for reproducibility
            reflection_model: Model for GEPA's reflection LLM (default: openai/gpt-4o)
            max_metric_calls: Maximum GEPA optimization iterations (default: 150)
            objective_config: Configuration for multi-objective optimization dimensions.
            frontier_type: GEPA Pareto frontier tracking strategy.
            seed_prompts: Optional initial prompts. If None, uses empty strings.
            tracker_path: Optional path to SQLite file for tracking optimization history
            export_preset_path: Optional path to export optimized config as preset
            progress_callback: Optional callback for progress updates (percentage, message)
            verbose: If True, display detailed progress during optimization

        Returns:
            KareninaOutput with optimized prompts and metrics

        Example:
            >>> result = benchmark.optimize(
            ...     targets=["answering_system_prompt"],
            ...     reflection_model="openai/gpt-4o",
            ...     max_metric_calls=100,
            ... )
            >>> print(f"Improvement: {result.improvement:.1%}")
        """
        return _helpers.run_optimize(
            self,
            targets,
            config,
            train_ratio,
            val_ratio,
            test_ratio,
            seed,
            reflection_model,
            max_metric_calls,
            objective_config,
            frontier_type,
            seed_prompts,
            tracker_path,
            export_preset_path,
            progress_callback,
            verbose,
        )

    def optimization_history(
        self,
        tracker_path: Path | str = "~/.karenina/optimization_history.db",
        limit: int = 20,
    ) -> list["OptimizationRun"]:
        """Get optimization history for this benchmark."""
        try:
            from karenina.integrations.gepa import OptimizationTracker
        except ImportError:
            return []

        tracker = OptimizationTracker(tracker_path)
        return tracker.list_runs(benchmark_name=self.name, limit=limit)

    # ── Metadata management ──────────────────────────────────────────────

    def get_custom_property(self, name: str) -> Any:
        """Get a custom property from benchmark metadata."""
        return self._metadata_manager.get_custom_property(name)

    def set_custom_property(self, name: str, value: Any) -> None:
        """Set a custom property in benchmark metadata."""
        self._metadata_manager.set_custom_property(name, value)

    def remove_custom_property(self, name: str) -> bool:
        """Remove a custom property from benchmark metadata."""
        return self._metadata_manager.remove_custom_property(name)

    def get_all_custom_properties(self) -> dict[str, Any]:
        """Get all custom properties as a dictionary."""
        return self._metadata_manager.get_all_custom_properties()

    def set_multiple_custom_properties(self, properties: dict[str, Any]) -> None:
        """Set multiple custom properties at once."""
        self._metadata_manager.set_multiple_custom_properties(properties)

    # ── Export and reporting ──────────────────────────────────────────────

    def to_dict(self) -> dict[str, Any]:
        """Export benchmark as a plain dictionary."""
        return self._export_manager.to_dict()

    def to_markdown(self) -> str:
        """Export benchmark as markdown document."""
        return self._export_manager.to_markdown()

    def to_csv(self) -> str:
        """Export questions as CSV format."""
        return self._export_manager.to_csv()

    def get_summary(self) -> dict[str, Any]:
        """Get comprehensive benchmark statistics."""
        return self._export_manager.get_summary()

    def get_statistics(self) -> dict[str, Any]:
        """Get detailed statistics about the benchmark."""
        return self._export_manager.get_statistics()

    def check_readiness(self) -> dict[str, Any]:
        """Comprehensive readiness check for verification."""
        return self._export_manager.check_readiness()

    def get_health_report(self) -> dict[str, Any]:
        """Get comprehensive health/status report."""
        return self._export_manager.get_health_report()

    def clone(self) -> "Benchmark":
        """Create a deep copy of the benchmark."""
        cloned_base = self._export_manager.clone()
        instance = Benchmark.__new__(Benchmark)
        instance._base = cloned_base
        instance._workspace_root = self._workspace_root
        instance._init_managers()
        return instance

    def validate(self) -> tuple[bool, str]:
        """Validate the benchmark structure and all templates."""
        from .verification.utils.validation import validate_answer_template

        is_valid, error_msg = self._base.validate()
        if not is_valid:
            return False, error_msg

        for q_id, q_data in self._questions_cache.items():
            template_code = q_data.get("answer_template")
            if template_code is not None:
                is_valid, error_msg_or_none, _ = validate_answer_template(template_code)
                error_msg = error_msg_or_none or "Unknown validation error"
                if not is_valid:
                    return False, f"Invalid template for {q_id}: {error_msg}"

        return True, "Benchmark is valid"

    def set_metadata(self, **metadata: Any) -> None:
        """Set benchmark metadata."""
        self._base.set_metadata(**metadata)

    # ── Base class property delegation ───────────────────────────────────

    @property
    def _checkpoint(self) -> Any:
        """Get the raw JSON-LD checkpoint data (for backward compatibility)."""
        return self._base._checkpoint

    @property
    def _questions_cache(self) -> dict[str, Any]:
        """Get the questions cache (for backward compatibility)."""
        return self._base._questions_cache

    @property
    def _question_registry(self) -> dict[str, Any]:
        """Get the question registry (for backward compatibility)."""
        return self._base._question_registry

    def _get_item_id(self, item: Any) -> str:
        """Get the ID for a DataFeedItem (for backward compatibility)."""
        return self._base._get_item_id(item)

    def _rebuild_cache(self) -> None:
        """Rebuild the internal questions cache (for backward compatibility)."""
        return self._base._rebuild_cache()

    def _get_merged_rubric_for_question(self, question_id: str) -> Rubric | None:
        """Get merged rubric for a question (for backward compatibility)."""
        return self._rubric_manager.get_merged_rubric_for_question(question_id)

    @property
    def jsonld_data(self) -> Any:
        """Get the raw JSON-LD benchmark data."""
        return self._base.jsonld_data

    @property
    def name(self) -> str:
        """Get the benchmark name."""
        return self._base.name

    @name.setter
    def name(self, value: str) -> None:
        self._base.name = value

    @property
    def description(self) -> str:
        """Get the benchmark description."""
        return self._base.description

    @description.setter
    def description(self, value: str) -> None:
        self._base.description = value

    @property
    def version(self) -> str:
        """Get the benchmark version."""
        return self._base.version

    @version.setter
    def version(self, value: str) -> None:
        self._base.version = value

    @property
    def creator(self) -> str:
        """Get the benchmark creator."""
        return self._base.creator

    @creator.setter
    def creator(self, value: str) -> None:
        self._base.creator = value

    @property
    def id(self) -> str | None:
        """Get the benchmark ID."""
        return self._base.id

    @id.setter
    def id(self, value: str | None) -> None:
        self._base.id = value

    @property
    def created_at(self) -> str:
        """Get the creation timestamp."""
        return self._base.created_at

    @created_at.setter
    def created_at(self, value: str) -> None:
        self._base.created_at = value

    @property
    def modified_at(self) -> str:
        """Get the last modification timestamp."""
        return self._base.modified_at

    @modified_at.setter
    def modified_at(self, value: str) -> None:
        self._base.modified_at = value

    @property
    def question_count(self) -> int:
        """Get the total number of questions."""
        return self._base.question_count

    @property
    def finished_count(self) -> int:
        """Get the number of finished questions."""
        return self._base.finished_count

    @property
    def is_empty(self) -> bool:
        """Check if the benchmark has no questions and no scenarios."""
        return len(self._base._questions_cache) == 0 and len(self._scenarios) == 0

    @property
    def is_complete(self) -> bool:
        """Check if all questions have templates and are finished."""
        return self._base.is_complete

    def get_progress(self) -> float:
        """Get completion progress as percentage (0-100)."""
        return self._base.get_progress()

    # ── Magic methods ────────────────────────────────────────────────────

    def __repr__(self) -> str:
        """Developer-friendly representation with detailed statistics."""
        return _helpers.build_repr(self)

    def __str__(self) -> str:
        """String representation (same as repr for developer-friendly output)."""
        return self.__repr__()

    def __len__(self) -> int:
        """Return the number of questions or scenarios in the benchmark."""
        if self._scenarios:
            return len(self._scenarios)
        return len(self._base)

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """Iterate over questions in the benchmark."""
        return iter(self._question_manager)

    def __contains__(self, question_id: str) -> bool:
        """Check if a question ID exists in the benchmark."""
        return question_id in self._base

    def __getitem__(self, key: str | int | slice) -> "SchemaOrgQuestion | list[SchemaOrgQuestion]":
        """Get question(s) as SchemaOrgQuestion object(s) using bracket notation."""
        from ..schemas.entities.question import QuestionRegistryEntry

        if isinstance(key, str):
            question_data = self._base[key]
            finished = self._base._question_registry.get(key, QuestionRegistryEntry()).finished
            return _helpers.convert_to_schema_org_question(question_data, finished=finished)
        elif isinstance(key, int):
            question_ids = self.get_question_ids()
            original_key = key
            if key < 0:
                key += len(question_ids)
            if not 0 <= key < len(question_ids):
                raise IndexError(f"Question index {original_key} out of range (0-{len(question_ids) - 1})")
            question_id = question_ids[key]
            question_data = self._base[question_id]
            finished = self._base._question_registry.get(question_id, QuestionRegistryEntry()).finished
            return _helpers.convert_to_schema_org_question(question_data, finished=finished)
        elif isinstance(key, slice):
            question_ids = self.get_question_ids()
            selected_ids = question_ids[key]
            return [
                _helpers.convert_to_schema_org_question(
                    self._base[qid],
                    finished=self._base._question_registry.get(qid, QuestionRegistryEntry()).finished,
                )
                for qid in selected_ids
            ]
        else:
            raise TypeError(f"Invalid key type {type(key)}. Expected str, int, or slice.")

    def _convert_to_schema_org_question(self, question_data: dict[str, Any]) -> "SchemaOrgQuestion":
        """Convert internal question dictionary to SchemaOrgQuestion object."""
        from ..schemas.entities.question import QuestionRegistryEntry

        q_id = question_data.get("id", "")
        finished = self._base._question_registry.get(q_id, QuestionRegistryEntry()).finished
        return _helpers.convert_to_schema_org_question(question_data, finished=finished)

    def __eq__(self, other: object) -> bool:
        """Compare two benchmarks for equality."""
        if not isinstance(other, Benchmark):
            return NotImplemented
        return self._base == other._base
Attributes
created_at property writable
created_at: str

Get the creation timestamp.

creator property writable
creator: str

Get the benchmark creator.

description property writable
description: str

Get the benchmark description.

finished_count property
finished_count: int

Get the number of finished questions.

id property writable
id: str | None

Get the benchmark ID.

is_complete property
is_complete: bool

Check if all questions have templates and are finished.

is_empty property
is_empty: bool

Check if the benchmark has no questions and no scenarios.

is_scenario_benchmark property
is_scenario_benchmark: bool

True if this benchmark contains scenarios instead of standalone questions.

jsonld_data property
jsonld_data: Any

Get the raw JSON-LD benchmark data.

modified_at property writable
modified_at: str

Get the last modification timestamp.

name property writable
name: str

Get the benchmark name.

question_count property
question_count: int

Get the total number of questions.

scenario_count property
scenario_count: int

Return the number of scenarios in the benchmark.

version property writable
version: str

Get the benchmark version.

workspace_root property
workspace_root: Path | None

Root directory for task workspaces (not persisted in checkpoint).

Functions
__init__
__init__(
    name: str,
    description: str = "",
    version: str = "0.1.0",
    creator: str = "Karenina Benchmarking System",
    workspace_root: Path | None = None,
)

Parameters:

Name Type Description Default
name str

Name of the benchmark

required
description str

Description of the benchmark

''
version str

Version of the benchmark content

'0.1.0'
creator str

Creator name or organization

'Karenina Benchmarking System'
workspace_root Path | None

Root directory containing task workspaces. Question workspace paths are resolved relative to this root. Not persisted in the checkpoint (it is a local filesystem path).

None
Source code in src/karenina/benchmark/benchmark.py
def __init__(
    self,
    name: str,
    description: str = "",
    version: str = "0.1.0",
    creator: str = "Karenina Benchmarking System",
    workspace_root: Path | None = None,
):
    """
    Initialize a new benchmark.

    Args:
        name: Name of the benchmark
        description: Description of the benchmark
        version: Version of the benchmark content
        creator: Creator name or organization
        workspace_root: Root directory containing task workspaces.
            Question workspace paths are resolved relative to this root.
            Not persisted in the checkpoint (it is a local filesystem path).
    """
    self._base = BenchmarkBase(name, description, version, creator)
    self._workspace_root = workspace_root
    self._scenarios: dict[str, ScenarioDefinition] = {}
    self._metadata_manager = MetadataManager(self._base)
    self._question_manager = QuestionManager(self._base)
    self._rubric_manager = RubricManager(self._base)
    self._template_manager = TemplateManager(self._base)
    self._results_manager = ResultsManager(self._base)
    self._verification_manager = VerificationManager(self._base, self._rubric_manager)
    self._export_manager = ExportManager(self._base, self._template_manager, self._rubric_manager)
add_answer_template
add_answer_template(
    question_id: str, template_code: str
) -> None

Add or update an answer template for a question.

Source code in src/karenina/benchmark/benchmark.py
def add_answer_template(self, question_id: str, template_code: str) -> None:
    """Add or update an answer template for a question."""
    self._template_manager.add_answer_template(question_id, template_code)
add_global_rubric_trait
add_global_rubric_trait(
    trait: LLMRubricTrait
    | RegexRubricTrait
    | CallableRubricTrait
    | MetricRubricTrait
    | AgenticRubricTrait,
) -> None

Add a global rubric trait to the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def add_global_rubric_trait(
    self, trait: LLMRubricTrait | RegexRubricTrait | CallableRubricTrait | MetricRubricTrait | AgenticRubricTrait
) -> None:
    """Add a global rubric trait to the benchmark."""
    self._rubric_manager.add_global_rubric_trait(trait)
add_question
add_question(
    question: Union[str, dict[str, Any], Question],
    raw_answer: str | None = None,
    answer_template: str | type | None = None,
    question_id: str | None = None,
    finished: bool | object = _NOT_PROVIDED,
    author: dict[str, Any] | None = None,
    sources: list[dict[str, Any]] | None = None,
    custom_metadata: dict[str, Any] | None = None,
    few_shot_examples: list[dict[str, str]] | None = None,
    answer_notes: str | None = None,
) -> str

Add a question to the benchmark.

Accepts a question string, a Question object, or a dict with keys question and raw_answer (plus any optional kwargs).

Raises:

Type Description
ValueError

If scenarios already exist (homogeneous enforcement).

Source code in src/karenina/benchmark/benchmark.py
def add_question(
    self,
    question: Union[str, dict[str, Any], "Question"],
    raw_answer: str | None = None,
    answer_template: str | type | None = None,
    question_id: str | None = None,
    finished: bool | object = _NOT_PROVIDED,
    author: dict[str, Any] | None = None,
    sources: list[dict[str, Any]] | None = None,
    custom_metadata: dict[str, Any] | None = None,
    few_shot_examples: list[dict[str, str]] | None = None,
    answer_notes: str | None = None,
) -> str:
    """Add a question to the benchmark.

    Accepts a question string, a Question object, or a dict with keys
    ``question`` and ``raw_answer`` (plus any optional kwargs).

    Raises:
        ValueError: If scenarios already exist (homogeneous enforcement).
    """
    if self._scenarios:
        raise ValueError(
            "Cannot add standalone questions to a scenario benchmark. "
            "Scenarios and standalone questions cannot coexist in the same benchmark."
        )
    return self._question_manager.add_question(
        question,
        raw_answer,
        answer_template,
        question_id,
        finished,
        author,
        sources,
        custom_metadata,
        few_shot_examples,
        answer_notes=answer_notes,
    )
add_question_from_object
add_question_from_object(
    question_obj: Question, **metadata: Any
) -> str

Add a question to the benchmark from a Question object.

Source code in src/karenina/benchmark/benchmark.py
def add_question_from_object(self, question_obj: "Question", **metadata: Any) -> str:
    """Add a question to the benchmark from a Question object."""
    return self._question_manager.add_question_from_object(question_obj, **metadata)
add_question_rubric_trait
add_question_rubric_trait(
    question_id: str,
    trait: LLMRubricTrait
    | RegexRubricTrait
    | CallableRubricTrait
    | MetricRubricTrait
    | AgenticRubricTrait,
) -> None

Add a question-specific rubric trait.

Source code in src/karenina/benchmark/benchmark.py
def add_question_rubric_trait(
    self,
    question_id: str,
    trait: LLMRubricTrait | RegexRubricTrait | CallableRubricTrait | MetricRubricTrait | AgenticRubricTrait,
) -> None:
    """Add a question-specific rubric trait."""
    self._rubric_manager.add_question_rubric_trait(question_id, trait)
add_questions
add_questions(
    questions_data: list[dict[str, Any]],
) -> list[str]

Add multiple questions at once.

Each dict is passed to add_question(), so all dict keys supported there are accepted here.

Parameters:

Name Type Description Default
questions_data list[dict[str, Any]]

List of dicts with question data.

required

Returns:

Type Description
list[str]

List of question IDs that were created.

Raises:

Type Description
ValueError

If scenarios already exist (homogeneous enforcement).

Source code in src/karenina/benchmark/benchmark.py
def add_questions(self, questions_data: list[dict[str, Any]]) -> list[str]:
    """Add multiple questions at once.

    Each dict is passed to ``add_question()``, so all dict keys supported
    there are accepted here.

    Args:
        questions_data: List of dicts with question data.

    Returns:
        List of question IDs that were created.

    Raises:
        ValueError: If scenarios already exist (homogeneous enforcement).
    """
    if self._scenarios:
        raise ValueError(
            "Cannot add standalone questions to a scenario benchmark. "
            "Scenarios and standalone questions cannot coexist in the same benchmark."
        )
    return self._question_manager.add_questions(questions_data)
add_questions_batch
add_questions_batch(
    questions_data: list[dict[str, Any]],
) -> list[str]

Add multiple questions at once.

Source code in src/karenina/benchmark/benchmark.py
def add_questions_batch(self, questions_data: list[dict[str, Any]]) -> list[str]:
    """Add multiple questions at once."""
    return self._question_manager.add_questions_batch(questions_data)
add_scenario
add_scenario(scenario: ScenarioDefinition | Any) -> None

Add a scenario to the benchmark.

Accepts either a ScenarioDefinition (frozen) or a Scenario builder (which will be validated and frozen automatically).

Parameters:

Name Type Description Default
scenario ScenarioDefinition | Any

A ScenarioDefinition or a Scenario builder instance.

required

Raises:

Type Description
ValueError

If standalone questions already exist (homogeneous enforcement), or if a scenario with the same name already exists.

Source code in src/karenina/benchmark/benchmark.py
def add_scenario(self, scenario: "ScenarioDefinition | Any") -> None:
    """Add a scenario to the benchmark.

    Accepts either a ScenarioDefinition (frozen) or a Scenario builder
    (which will be validated and frozen automatically).

    Args:
        scenario: A ScenarioDefinition or a Scenario builder instance.

    Raises:
        ValueError: If standalone questions already exist (homogeneous enforcement),
            or if a scenario with the same name already exists.
    """
    if self._base._questions_cache:
        raise ValueError(
            "Cannot add scenarios to a benchmark that already contains standalone questions. "
            "Scenarios and standalone questions cannot coexist in the same benchmark."
        )

    # Accept Scenario builder: call validate() to get a ScenarioDefinition
    if not isinstance(scenario, ScenarioDefinition):
        scenario = scenario.validate()

    if scenario.name in self._scenarios:
        raise ValueError(f"Scenario '{scenario.name}' already exists")

    self._scenarios[scenario.name] = scenario

    # Write to checkpoint (checkpoint is source of truth)
    from ..scenario.checkpoint import scenario_to_schema_org
    from ..schemas.checkpoint import SchemaOrgPropertyValue

    schema_org = scenario_to_schema_org(scenario)
    if self._base._checkpoint.hasPart is None:
        self._base._checkpoint.hasPart = []
    self._base._checkpoint.hasPart.append(schema_org)

    # Set benchmark_type flag (once)
    props = self._base._checkpoint.additionalProperty or []
    if not any(p.name == "benchmark_type" for p in props):
        if self._base._checkpoint.additionalProperty is None:
            self._base._checkpoint.additionalProperty = []
        self._base._checkpoint.additionalProperty.append(
            SchemaOrgPropertyValue(name="benchmark_type", value="scenario")
        )
apply_global_template
apply_global_template(template_code: str) -> list[str]

Apply a template to all questions that don't have one.

Source code in src/karenina/benchmark/benchmark.py
def apply_global_template(self, template_code: str) -> list[str]:
    """Apply a template to all questions that don't have one."""
    return self._template_manager.apply_global_template(template_code)
check_readiness
check_readiness() -> dict[str, Any]

Comprehensive readiness check for verification.

Source code in src/karenina/benchmark/benchmark.py
def check_readiness(self) -> dict[str, Any]:
    """Comprehensive readiness check for verification."""
    return self._export_manager.check_readiness()
clear_all_rubrics
clear_all_rubrics() -> int

Remove all rubrics (global and question-specific).

Source code in src/karenina/benchmark/benchmark.py
def clear_all_rubrics(self) -> int:
    """Remove all rubrics (global and question-specific)."""
    return self._rubric_manager.clear_all_rubrics()
clear_global_rubric
clear_global_rubric() -> bool

Remove the global rubric.

Source code in src/karenina/benchmark/benchmark.py
def clear_global_rubric(self) -> bool:
    """Remove the global rubric."""
    return self._rubric_manager.clear_global_rubric()
clear_questions
clear_questions() -> int

Remove all questions from the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def clear_questions(self) -> int:
    """Remove all questions from the benchmark."""
    return self._question_manager.clear_questions()
clear_verification_results
clear_verification_results(
    question_ids: list[str] | None = None,
    run_name: str | None = None,
) -> int

Clear verification results.

Source code in src/karenina/benchmark/benchmark.py
def clear_verification_results(
    self,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
) -> int:
    """Clear verification results."""
    return self._results_manager.clear_verification_results(question_ids, run_name)
clone
clone() -> Benchmark

Create a deep copy of the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def clone(self) -> "Benchmark":
    """Create a deep copy of the benchmark."""
    cloned_base = self._export_manager.clone()
    instance = Benchmark.__new__(Benchmark)
    instance._base = cloned_base
    instance._workspace_root = self._workspace_root
    instance._init_managers()
    return instance
copy_template
copy_template(from_id: str, to_id: str) -> None

Copy template from one question to another.

Source code in src/karenina/benchmark/benchmark.py
def copy_template(self, from_id: str, to_id: str) -> None:
    """Copy template from one question to another."""
    self._template_manager.copy_template(from_id, to_id)
count_by_field
count_by_field(
    field_path: str,
    questions: list[dict[str, Any]] | None = None,
) -> dict[Any, int]

Count questions grouped by a field value using dot notation.

Source code in src/karenina/benchmark/benchmark.py
def count_by_field(self, field_path: str, questions: list[dict[str, Any]] | None = None) -> dict[Any, int]:
    """Count questions grouped by a field value using dot notation."""
    return self._question_manager.count_by_field(field_path, questions)
create classmethod
create(
    name: str,
    description: str = "",
    version: str = "0.1.0",
    creator: str = "Karenina Benchmarking System",
    workspace_root: Path | None = None,
) -> Benchmark

Create a new benchmark (alias for constructor).

Source code in src/karenina/benchmark/benchmark.py
@classmethod
def create(
    cls,
    name: str,
    description: str = "",
    version: str = "0.1.0",
    creator: str = "Karenina Benchmarking System",
    workspace_root: Path | None = None,
) -> "Benchmark":
    """Create a new benchmark (alias for constructor)."""
    return cls(name, description, version, creator, workspace_root=workspace_root)
export_generated_templates
export_generated_templates(file_path: Path) -> None

Export all generated templates to a JSON file.

Source code in src/karenina/benchmark/benchmark.py
def export_generated_templates(self, file_path: Path) -> None:
    """Export all generated templates to a JSON file."""
    _helpers.export_generated_templates(self, file_path)
export_verification_results
export_verification_results(
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    format: str = "json",
    global_rubric: Rubric | None = None,
) -> str

Export verification results in specified format.

Source code in src/karenina/benchmark/benchmark.py
def export_verification_results(
    self,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    format: str = "json",
    global_rubric: "Rubric | None" = None,
) -> str:
    """Export verification results in specified format."""
    return self._results_manager.export_verification_results(question_ids, run_name, format, global_rubric)
export_verification_results_to_file
export_verification_results_to_file(
    file_path: Path,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    format: str | None = None,
    global_rubric: Rubric | None = None,
) -> None

Export verification results directly to a file.

Source code in src/karenina/benchmark/benchmark.py
def export_verification_results_to_file(
    self,
    file_path: Path,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    format: str | None = None,
    global_rubric: "Rubric | None" = None,
) -> None:
    """Export verification results directly to a file."""
    self._results_manager.export_results_to_file(file_path, question_ids, run_name, format, global_rubric)
filter_by_custom_metadata
filter_by_custom_metadata(
    match_all: bool = True, **criteria: Any
) -> list[dict[str, Any]]

Filter questions by custom metadata fields with AND/OR logic.

Source code in src/karenina/benchmark/benchmark.py
def filter_by_custom_metadata(self, match_all: bool = True, **criteria: Any) -> list[dict[str, Any]]:
    """Filter questions by custom metadata fields with AND/OR logic."""
    return self._question_manager.filter_by_custom_metadata(match_all, **criteria)
filter_by_metadata
filter_by_metadata(
    field_path: str, value: Any, match_mode: str = "exact"
) -> list[dict[str, Any]]

Filter questions by a metadata field using dot notation.

Source code in src/karenina/benchmark/benchmark.py
def filter_by_metadata(self, field_path: str, value: Any, match_mode: str = "exact") -> list[dict[str, Any]]:
    """Filter questions by a metadata field using dot notation."""
    return self._question_manager.filter_by_metadata(field_path, value, match_mode)
filter_questions
filter_questions(
    finished: bool | None = None,
    has_template: bool | None = None,
    has_rubric: bool | None = None,
    author: str | None = None,
    custom_filter: Any = None,
) -> list[dict[str, Any]]

Filter questions based on criteria.

Source code in src/karenina/benchmark/benchmark.py
def filter_questions(
    self,
    finished: bool | None = None,
    has_template: bool | None = None,
    has_rubric: bool | None = None,
    author: str | None = None,
    custom_filter: Any = None,
) -> list[dict[str, Any]]:
    """Filter questions based on criteria."""
    return self._question_manager.filter_questions(finished, has_template, has_rubric, author, custom_filter)
generate_all_templates
generate_all_templates(
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    progress_callback: Callable[[float, str], None]
    | None = None,
    only_missing: bool = True,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, dict[str, Any]]

Generate templates for all questions in the benchmark using LLM.

Source code in src/karenina/benchmark/benchmark.py
def generate_all_templates(
    self,
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    progress_callback: Callable[[float, str], None] | None = None,
    only_missing: bool = True,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, dict[str, Any]]:
    """Generate templates for all questions in the benchmark using LLM."""
    return _helpers.generate_all_templates(
        self,
        model,
        model_provider,
        temperature,
        interface,
        force_regenerate,
        progress_callback,
        only_missing,
        endpoint_base_url,
        endpoint_api_key,
    )
generate_template_for_question
generate_template_for_question(
    question_id: str,
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, Any]

Generate an answer template for a specific question using LLM.

Source code in src/karenina/benchmark/benchmark.py
def generate_template_for_question(
    self,
    question_id: str,
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, Any]:
    """Generate an answer template for a specific question using LLM."""
    return _helpers.generate_template_for_question(
        self,
        question_id,
        model,
        model_provider,
        temperature,
        interface,
        force_regenerate,
        endpoint_base_url,
        endpoint_api_key,
    )
generate_templates
generate_templates(
    question_ids: list[str],
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    progress_callback: Callable[[float, str], None]
    | None = None,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, dict[str, Any]]

Generate templates for multiple questions using LLM.

Source code in src/karenina/benchmark/benchmark.py
def generate_templates(
    self,
    question_ids: list[str],
    model: str = "gemini-2.0-flash",
    model_provider: str = "google_genai",
    temperature: float = 0,
    interface: str = "langchain",
    force_regenerate: bool = False,
    progress_callback: Callable[[float, str], None] | None = None,
    endpoint_base_url: str | None = None,
    endpoint_api_key: str | None = None,
) -> dict[str, dict[str, Any]]:
    """Generate templates for multiple questions using LLM."""
    return _helpers.generate_templates(
        self,
        question_ids,
        model,
        model_provider,
        temperature,
        interface,
        force_regenerate,
        progress_callback,
        endpoint_base_url,
        endpoint_api_key,
    )
get_all_custom_properties
get_all_custom_properties() -> dict[str, Any]

Get all custom properties as a dictionary.

Source code in src/karenina/benchmark/benchmark.py
def get_all_custom_properties(self) -> dict[str, Any]:
    """Get all custom properties as a dictionary."""
    return self._metadata_manager.get_all_custom_properties()
get_all_questions
get_all_questions(
    ids_only: bool = False,
) -> list[str] | list[dict[str, Any]]

Get all questions in the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def get_all_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
    """Get all questions in the benchmark."""
    return self._question_manager.get_all_questions(ids_only)
get_all_questions_as_objects
get_all_questions_as_objects() -> list[Question]

Get all questions as Question objects.

Source code in src/karenina/benchmark/benchmark.py
def get_all_questions_as_objects(self) -> list["Question"]:
    """Get all questions as Question objects."""
    return self._question_manager.get_all_questions_as_objects()
get_all_run_names
get_all_run_names() -> list[str]

Get all verification run names.

Source code in src/karenina/benchmark/benchmark.py
def get_all_run_names(self) -> list[str]:
    """Get all verification run names."""
    return self._results_manager.get_all_run_names()
get_custom_property
get_custom_property(name: str) -> Any

Get a custom property from benchmark metadata.

Source code in src/karenina/benchmark/benchmark.py
def get_custom_property(self, name: str) -> Any:
    """Get a custom property from benchmark metadata."""
    return self._metadata_manager.get_custom_property(name)
get_finished_questions
get_finished_questions(
    ids_only: bool = False,
) -> list[str] | list[dict[str, Any]]

Get questions that are marked as finished.

Source code in src/karenina/benchmark/benchmark.py
def get_finished_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
    """Get questions that are marked as finished."""
    return self._question_manager.get_finished_questions(ids_only)
get_finished_templates
get_finished_templates(
    question_ids: set[str] | None = None,
) -> list[FinishedTemplate]

Get all finished templates for verification.

Source code in src/karenina/benchmark/benchmark.py
def get_finished_templates(self, question_ids: set[str] | None = None) -> list[FinishedTemplate]:
    """Get all finished templates for verification."""
    return self._template_manager.get_finished_templates(question_ids=question_ids)
get_global_dynamic_rubric
get_global_dynamic_rubric() -> DynamicRubric | None

Get the global dynamic rubric from the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def get_global_dynamic_rubric(self) -> DynamicRubric | None:
    """Get the global dynamic rubric from the benchmark."""
    return self._rubric_manager.get_global_dynamic_rubric()
get_global_rubric
get_global_rubric() -> Rubric | None

Get the global rubric from the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def get_global_rubric(self) -> Rubric | None:
    """Get the global rubric from the benchmark."""
    return self._rubric_manager.get_global_rubric()
get_health_report
get_health_report() -> dict[str, Any]

Get comprehensive health/status report.

Source code in src/karenina/benchmark/benchmark.py
def get_health_report(self) -> dict[str, Any]:
    """Get comprehensive health/status report."""
    return self._export_manager.get_health_report()
get_merged_dynamic_rubric_for_question
get_merged_dynamic_rubric_for_question(
    question_id: str,
) -> DynamicRubric | None

Get merged dynamic rubric for a question (global + question-specific).

Parameters:

Name Type Description Default
question_id str

The question ID.

required

Returns:

Type Description
DynamicRubric | None

Merged DynamicRubric or None if neither global nor question-level exists.

Source code in src/karenina/benchmark/benchmark.py
def get_merged_dynamic_rubric_for_question(self, question_id: str) -> DynamicRubric | None:
    """Get merged dynamic rubric for a question (global + question-specific).

    Args:
        question_id: The question ID.

    Returns:
        Merged DynamicRubric or None if neither global nor question-level exists.
    """
    return self._rubric_manager.get_merged_dynamic_rubric_for_question(question_id)
get_missing_templates
get_missing_templates(
    ids_only: bool = False,
) -> list[str] | list[dict[str, Any]]

Get questions that don't have non-default templates.

Source code in src/karenina/benchmark/benchmark.py
def get_missing_templates(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
    """Get questions that don't have non-default templates."""
    return self._template_manager.get_missing_templates(ids_only)
get_progress
get_progress() -> float

Get completion progress as percentage (0-100).

Source code in src/karenina/benchmark/benchmark.py
def get_progress(self) -> float:
    """Get completion progress as percentage (0-100)."""
    return self._base.get_progress()
get_question
get_question(question_id: str) -> dict[str, Any]

Get a question by ID.

Source code in src/karenina/benchmark/benchmark.py
def get_question(self, question_id: str) -> dict[str, Any]:
    """Get a question by ID."""
    return self._question_manager.get_question(question_id)
get_question_as_object
get_question_as_object(question_id: str) -> Question

Get a question as a Question object.

Source code in src/karenina/benchmark/benchmark.py
def get_question_as_object(self, question_id: str) -> "Question":
    """Get a question as a Question object."""
    return self._question_manager.get_question_as_object(question_id)
get_question_author
get_question_author(
    question_id: str,
) -> dict[str, Any] | None

Get author information for a question.

Source code in src/karenina/benchmark/benchmark.py
def get_question_author(self, question_id: str) -> dict[str, Any] | None:
    """Get author information for a question."""
    return self._question_manager.get_question_author(question_id)
get_question_custom_property
get_question_custom_property(
    question_id: str, name: str
) -> Any

Get a custom property from question metadata.

Source code in src/karenina/benchmark/benchmark.py
def get_question_custom_property(self, question_id: str, name: str) -> Any:
    """Get a custom property from question metadata."""
    return self._question_manager.get_question_custom_property(question_id, name)
get_question_ids
get_question_ids() -> list[str]

Get all question IDs in the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def get_question_ids(self) -> list[str]:
    """Get all question IDs in the benchmark."""
    return self._question_manager.get_question_ids()
get_question_metadata
get_question_metadata(question_id: str) -> dict[str, Any]

Get all metadata for a specific question.

Source code in src/karenina/benchmark/benchmark.py
def get_question_metadata(self, question_id: str) -> dict[str, Any]:
    """Get all metadata for a specific question."""
    return self._question_manager.get_question_metadata(question_id)
get_question_sources
get_question_sources(
    question_id: str,
) -> list[dict[str, Any]] | None

Get source documents for a question.

Source code in src/karenina/benchmark/benchmark.py
def get_question_sources(self, question_id: str) -> list[dict[str, Any]] | None:
    """Get source documents for a question."""
    return self._question_manager.get_question_sources(question_id)
get_question_timestamps
get_question_timestamps(question_id: str) -> dict[str, str]

Get creation and modification timestamps for a question.

Source code in src/karenina/benchmark/benchmark.py
def get_question_timestamps(self, question_id: str) -> dict[str, str]:
    """Get creation and modification timestamps for a question."""
    return self._question_manager.get_question_timestamps(question_id)
get_questions_by_author
get_questions_by_author(
    author: str,
) -> list[dict[str, Any]]

Get questions created by a specific author.

Source code in src/karenina/benchmark/benchmark.py
def get_questions_by_author(self, author: str) -> list[dict[str, Any]]:
    """Get questions created by a specific author."""
    return self._question_manager.get_questions_by_author(author)
get_questions_with_rubric
get_questions_with_rubric() -> list[dict[str, Any]]

Get questions that have question-specific rubrics.

Source code in src/karenina/benchmark/benchmark.py
def get_questions_with_rubric(self) -> list[dict[str, Any]]:
    """Get questions that have question-specific rubrics."""
    return self._question_manager.get_questions_with_rubric()
get_results_statistics_by_run
get_results_statistics_by_run() -> dict[
    str, dict[str, Any]
]

Get verification statistics for each run.

Source code in src/karenina/benchmark/benchmark.py
def get_results_statistics_by_run(self) -> dict[str, dict[str, Any]]:
    """Get verification statistics for each run."""
    return self._results_manager.get_results_statistics_by_run()
get_scenario
get_scenario(name: str) -> ScenarioDefinition

Get a scenario by name.

Parameters:

Name Type Description Default
name str

The scenario name.

required

Returns:

Type Description
ScenarioDefinition

The ScenarioDefinition.

Raises:

Type Description
KeyError

If no scenario with that name exists.

Source code in src/karenina/benchmark/benchmark.py
def get_scenario(self, name: str) -> ScenarioDefinition:
    """Get a scenario by name.

    Args:
        name: The scenario name.

    Returns:
        The ScenarioDefinition.

    Raises:
        KeyError: If no scenario with that name exists.
    """
    try:
        return self._scenarios[name]
    except KeyError:
        raise KeyError(f"Scenario '{name}' not found") from None
get_scenarios
get_scenarios() -> list[ScenarioDefinition]

Get all scenario definitions.

Returns:

Type Description
list[ScenarioDefinition]

List of ScenarioDefinition instances.

Source code in src/karenina/benchmark/benchmark.py
def get_scenarios(self) -> list[ScenarioDefinition]:
    """Get all scenario definitions.

    Returns:
        List of ScenarioDefinition instances.
    """
    return list(self._scenarios.values())
get_statistics
get_statistics() -> dict[str, Any]

Get detailed statistics about the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def get_statistics(self) -> dict[str, Any]:
    """Get detailed statistics about the benchmark."""
    return self._export_manager.get_statistics()
get_summary
get_summary() -> dict[str, Any]

Get comprehensive benchmark statistics.

Source code in src/karenina/benchmark/benchmark.py
def get_summary(self) -> dict[str, Any]:
    """Get comprehensive benchmark statistics."""
    return self._export_manager.get_summary()
get_template
get_template(question_id: str) -> str

Get template code for a question.

Source code in src/karenina/benchmark/benchmark.py
def get_template(self, question_id: str) -> str:
    """Get template code for a question."""
    return self._template_manager.get_template(question_id)
get_unfinished_questions
get_unfinished_questions(
    ids_only: bool = False,
) -> list[str] | list[dict[str, Any]]

Get questions that are not marked as finished.

Source code in src/karenina/benchmark/benchmark.py
def get_unfinished_questions(self, ids_only: bool = False) -> list[str] | list[dict[str, Any]]:
    """Get questions that are not marked as finished."""
    return self._question_manager.get_unfinished_questions(ids_only)
get_verification_history
get_verification_history(
    question_id: str | None = None,
) -> dict[str, dict[str, VerificationResult]]

Get verification history organized by run name.

Source code in src/karenina/benchmark/benchmark.py
def get_verification_history(self, question_id: str | None = None) -> dict[str, dict[str, VerificationResult]]:
    """Get verification history organized by run name."""
    return self._results_manager.get_verification_history(question_id)
get_verification_results
get_verification_results(
    question_ids: list[str] | None = None,
    run_name: str | None = None,
) -> dict[str, VerificationResult]

Get verification results for specific questions and/or runs.

Source code in src/karenina/benchmark/benchmark.py
def get_verification_results(
    self,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
) -> dict[str, VerificationResult]:
    """Get verification results for specific questions and/or runs."""
    return self._results_manager.get_verification_results(question_ids, run_name)
get_verification_summary
get_verification_summary(
    run_name: str | None = None,
) -> dict[str, Any]

Get summary statistics for verification results.

Source code in src/karenina/benchmark/benchmark.py
def get_verification_summary(self, run_name: str | None = None) -> dict[str, Any]:
    """Get summary statistics for verification results."""
    return self._results_manager.get_verification_summary(run_name)
has_template
has_template(question_id: str) -> bool

Check if a question has a non-default template.

Source code in src/karenina/benchmark/benchmark.py
def has_template(self, question_id: str) -> bool:
    """Check if a question has a non-default template."""
    return self._template_manager.has_template(question_id)
import_generated_templates
import_generated_templates(
    file_path: Path, force_overwrite: bool = False
) -> dict[str, bool]

Import templates from a JSON file generated by export_generated_templates.

Source code in src/karenina/benchmark/benchmark.py
def import_generated_templates(self, file_path: Path, force_overwrite: bool = False) -> dict[str, bool]:
    """Import templates from a JSON file generated by export_generated_templates."""
    return _helpers.import_generated_templates(self, file_path, force_overwrite)
load classmethod
load(
    path: Path, workspace_root: Path | None = None
) -> Benchmark

Load a benchmark from a JSON-LD file.

Parameters:

Name Type Description Default
path Path

Path to the JSON-LD benchmark file.

required
workspace_root Path | None

Optional root directory for task workspaces.

None
Source code in src/karenina/benchmark/benchmark.py
@classmethod
def load(cls, path: Path, workspace_root: Path | None = None) -> "Benchmark":
    """Load a benchmark from a JSON-LD file.

    Args:
        path: Path to the JSON-LD benchmark file.
        workspace_root: Optional root directory for task workspaces.
    """
    base = BenchmarkBase.load(path)
    instance = cls.__new__(cls)
    instance._base = base
    instance._workspace_root = workspace_root
    instance._init_managers()
    return instance
load_from_db classmethod
load_from_db(
    benchmark_name: str, storage: str
) -> Benchmark

Load a benchmark from a database.

Source code in src/karenina/benchmark/benchmark.py
@classmethod
def load_from_db(cls, benchmark_name: str, storage: str) -> "Benchmark":
    """Load a benchmark from a database."""
    from ..storage import load_benchmark

    result = load_benchmark(benchmark_name, storage, load_config=False)
    return result  # type: ignore[return-value]
load_verification_results_from_file
load_verification_results_from_file(
    file_path: Path, run_name: str | None = None
) -> dict[str, VerificationResult]

Load verification results from a previously exported file.

Source code in src/karenina/benchmark/benchmark.py
def load_verification_results_from_file(
    self,
    file_path: Path,
    run_name: str | None = None,
) -> dict[str, VerificationResult]:
    """Load verification results from a previously exported file."""
    return self._results_manager.load_results_from_file(file_path, run_name)
mark_finished
mark_finished(question_id: str) -> None

Mark a question as finished.

Source code in src/karenina/benchmark/benchmark.py
def mark_finished(self, question_id: str) -> None:
    """Mark a question as finished."""
    self._question_manager.mark_finished(question_id)
mark_finished_batch
mark_finished_batch(question_ids: list[str]) -> None

Mark multiple questions as finished.

Source code in src/karenina/benchmark/benchmark.py
def mark_finished_batch(self, question_ids: list[str]) -> None:
    """Mark multiple questions as finished."""
    self._question_manager.mark_finished_batch(question_ids)
mark_unfinished
mark_unfinished(question_id: str) -> None

Mark a question as unfinished.

Source code in src/karenina/benchmark/benchmark.py
def mark_unfinished(self, question_id: str) -> None:
    """Mark a question as unfinished."""
    self._question_manager.mark_unfinished(question_id)
mark_unfinished_batch
mark_unfinished_batch(question_ids: list[str]) -> None

Mark multiple questions as unfinished.

Source code in src/karenina/benchmark/benchmark.py
def mark_unfinished_batch(self, question_ids: list[str]) -> None:
    """Mark multiple questions as unfinished."""
    self._question_manager.mark_unfinished_batch(question_ids)
optimization_history
optimization_history(
    tracker_path: Path
    | str = "~/.karenina/optimization_history.db",
    limit: int = 20,
) -> list[OptimizationRun]

Get optimization history for this benchmark.

Source code in src/karenina/benchmark/benchmark.py
def optimization_history(
    self,
    tracker_path: Path | str = "~/.karenina/optimization_history.db",
    limit: int = 20,
) -> list["OptimizationRun"]:
    """Get optimization history for this benchmark."""
    try:
        from karenina.integrations.gepa import OptimizationTracker
    except ImportError:
        return []

    tracker = OptimizationTracker(tracker_path)
    return tracker.list_runs(benchmark_name=self.name, limit=limit)
optimize
optimize(
    targets: list[str],
    config: VerificationConfig | None = None,
    train_ratio: float = 0.8,
    val_ratio: float = 0.2,
    test_ratio: float | None = None,
    seed: int | None = None,
    reflection_model: str = "openai/gpt-4o",
    max_metric_calls: int = 150,
    objective_config: ObjectiveConfig | None = None,
    frontier_type: FrontierType = "objective",
    seed_prompts: dict[str, str] | None = None,
    tracker_path: Path | str | None = None,
    export_preset_path: Path | str | None = None,
    progress_callback: Callable[[float, str], None]
    | None = None,
    verbose: bool = False,
) -> KareninaOutput

Optimize text components using GEPA with karenina verification as the metric.

Requires the 'gepa' optional dependency: pip install karenina[gepa]

Parameters:

Name Type Description Default
targets list[str]

List of components to optimize. Valid values: "answering_system_prompt", "parsing_instructions", "mcp_tool_descriptions"

required
config VerificationConfig | None

Base VerificationConfig to use. If None, uses default minimal config.

None
train_ratio float

Fraction of questions for training (default 0.8)

0.8
val_ratio float

Fraction of questions for validation (default 0.2)

0.2
test_ratio float | None

Optional fraction for testing. If None, no test set created.

None
seed int | None

Random seed for reproducibility

None
reflection_model str

Model for GEPA's reflection LLM (default: openai/gpt-4o)

'openai/gpt-4o'
max_metric_calls int

Maximum GEPA optimization iterations (default: 150)

150
objective_config ObjectiveConfig | None

Configuration for multi-objective optimization dimensions.

None
frontier_type FrontierType

GEPA Pareto frontier tracking strategy.

'objective'
seed_prompts dict[str, str] | None

Optional initial prompts. If None, uses empty strings.

None
tracker_path Path | str | None

Optional path to SQLite file for tracking optimization history

None
export_preset_path Path | str | None

Optional path to export optimized config as preset

None
progress_callback Callable[[float, str], None] | None

Optional callback for progress updates (percentage, message)

None
verbose bool

If True, display detailed progress during optimization

False

Returns:

Type Description
KareninaOutput

KareninaOutput with optimized prompts and metrics

Example

result = benchmark.optimize( ... targets=["answering_system_prompt"], ... reflection_model="openai/gpt-4o", ... max_metric_calls=100, ... ) print(f"Improvement: {result.improvement:.1%}")

Source code in src/karenina/benchmark/benchmark.py
def optimize(
    self,
    targets: list[str],
    config: VerificationConfig | None = None,
    train_ratio: float = 0.8,
    val_ratio: float = 0.2,
    test_ratio: float | None = None,
    seed: int | None = None,
    reflection_model: str = "openai/gpt-4o",
    max_metric_calls: int = 150,
    objective_config: "ObjectiveConfig | None" = None,
    frontier_type: "FrontierType" = "objective",
    seed_prompts: dict[str, str] | None = None,
    tracker_path: Path | str | None = None,
    export_preset_path: Path | str | None = None,
    progress_callback: Callable[[float, str], None] | None = None,
    verbose: bool = False,
) -> "KareninaOutput":
    """
    Optimize text components using GEPA with karenina verification as the metric.

    Requires the 'gepa' optional dependency: pip install karenina[gepa]

    Args:
        targets: List of components to optimize. Valid values:
                 "answering_system_prompt", "parsing_instructions", "mcp_tool_descriptions"
        config: Base VerificationConfig to use. If None, uses default minimal config.
        train_ratio: Fraction of questions for training (default 0.8)
        val_ratio: Fraction of questions for validation (default 0.2)
        test_ratio: Optional fraction for testing. If None, no test set created.
        seed: Random seed for reproducibility
        reflection_model: Model for GEPA's reflection LLM (default: openai/gpt-4o)
        max_metric_calls: Maximum GEPA optimization iterations (default: 150)
        objective_config: Configuration for multi-objective optimization dimensions.
        frontier_type: GEPA Pareto frontier tracking strategy.
        seed_prompts: Optional initial prompts. If None, uses empty strings.
        tracker_path: Optional path to SQLite file for tracking optimization history
        export_preset_path: Optional path to export optimized config as preset
        progress_callback: Optional callback for progress updates (percentage, message)
        verbose: If True, display detailed progress during optimization

    Returns:
        KareninaOutput with optimized prompts and metrics

    Example:
        >>> result = benchmark.optimize(
        ...     targets=["answering_system_prompt"],
        ...     reflection_model="openai/gpt-4o",
        ...     max_metric_calls=100,
        ... )
        >>> print(f"Improvement: {result.improvement:.1%}")
    """
    return _helpers.run_optimize(
        self,
        targets,
        config,
        train_ratio,
        val_ratio,
        test_ratio,
        seed,
        reflection_model,
        max_metric_calls,
        objective_config,
        frontier_type,
        seed_prompts,
        tracker_path,
        export_preset_path,
        progress_callback,
        verbose,
    )
remove_custom_property
remove_custom_property(name: str) -> bool

Remove a custom property from benchmark metadata.

Source code in src/karenina/benchmark/benchmark.py
def remove_custom_property(self, name: str) -> bool:
    """Remove a custom property from benchmark metadata."""
    return self._metadata_manager.remove_custom_property(name)
remove_question
remove_question(question_id: str) -> bool

Remove a specific question from the benchmark.

Source code in src/karenina/benchmark/benchmark.py
def remove_question(self, question_id: str) -> bool:
    """Remove a specific question from the benchmark."""
    return self._question_manager.remove_question(question_id)
remove_question_custom_property
remove_question_custom_property(
    question_id: str, name: str
) -> bool

Remove a custom property from question metadata.

Source code in src/karenina/benchmark/benchmark.py
def remove_question_custom_property(self, question_id: str, name: str) -> bool:
    """Remove a custom property from question metadata."""
    return self._question_manager.remove_question_custom_property(question_id, name)
remove_question_rubric
remove_question_rubric(question_id: str) -> bool

Remove question-specific rubric.

Source code in src/karenina/benchmark/benchmark.py
def remove_question_rubric(self, question_id: str) -> bool:
    """Remove question-specific rubric."""
    return self._rubric_manager.remove_question_rubric(question_id)
remove_scenario
remove_scenario(name: str) -> None

Remove a scenario by name.

Parameters:

Name Type Description Default
name str

The scenario name.

required

Raises:

Type Description
KeyError

If no scenario with that name exists.

Source code in src/karenina/benchmark/benchmark.py
def remove_scenario(self, name: str) -> None:
    """Remove a scenario by name.

    Args:
        name: The scenario name.

    Raises:
        KeyError: If no scenario with that name exists.
    """
    try:
        del self._scenarios[name]
    except KeyError:
        raise KeyError(f"Scenario '{name}' not found") from None

    # Remove from checkpoint
    if self._base._checkpoint.hasPart:
        self._base._checkpoint.hasPart = [s for s in self._base._checkpoint.hasPart if s.name != name]
        if not self._base._checkpoint.hasPart:
            self._base._checkpoint.hasPart = None
            # Clear benchmark_type flag when no scenarios remain
            if self._base._checkpoint.additionalProperty:
                self._base._checkpoint.additionalProperty = [
                    p for p in self._base._checkpoint.additionalProperty if p.name != "benchmark_type"
                ]
run_verification
run_verification(
    config: VerificationConfig,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    async_enabled: bool | None = None,
    progress_callback: Callable[[float, str], None]
    | None = None,
) -> VerificationResultSet

Run verification on the benchmark using existing execution system.

For scenario benchmarks, dispatches to _run_scenario_verification which iterates over the scenario x model cross-product. For standalone question benchmarks, delegates to VerificationManager.

Source code in src/karenina/benchmark/benchmark.py
def run_verification(
    self,
    config: VerificationConfig,
    question_ids: list[str] | None = None,
    run_name: str | None = None,
    async_enabled: bool | None = None,
    progress_callback: Callable[[float, str], None] | None = None,
) -> VerificationResultSet:
    """Run verification on the benchmark using existing execution system.

    For scenario benchmarks, dispatches to ``_run_scenario_verification``
    which iterates over the scenario x model cross-product.
    For standalone question benchmarks, delegates to VerificationManager.
    """
    if self.is_scenario_benchmark:
        return self._run_scenario_verification(
            config=config,
            run_name=run_name,
            async_enabled=async_enabled,
            progress_callback=progress_callback,
        )
    return self._verification_manager.run_verification(
        config,
        question_ids,
        run_name,
        async_enabled,
        progress_callback,
        workspace_root=self._workspace_root,
    )
save
save(
    path: Path, save_deep_judgment_config: bool = False
) -> None

Save the benchmark to a JSON-LD file.

Parameters:

Name Type Description Default
path Path

Path where to save the benchmark.

required
save_deep_judgment_config bool

If True, include deep judgment configuration in LLM rubric traits. If False (default), deep judgment settings are stripped before saving.

False
Source code in src/karenina/benchmark/benchmark.py
def save(self, path: Path, save_deep_judgment_config: bool = False) -> None:
    """Save the benchmark to a JSON-LD file.

    Args:
        path: Path where to save the benchmark.
        save_deep_judgment_config: If True, include deep judgment
            configuration in LLM rubric traits. If False (default),
            deep judgment settings are stripped before saving.
    """
    self._base.save(path, save_deep_judgment_config=save_deep_judgment_config)
save_to_db
save_to_db(
    storage: str, checkpoint_path: Path | None = None
) -> Benchmark

Save this benchmark to a database.

Source code in src/karenina/benchmark/benchmark.py
def save_to_db(self, storage: str, checkpoint_path: Path | None = None) -> "Benchmark":
    """Save this benchmark to a database."""
    from typing import cast

    from ..storage import save_benchmark

    result = save_benchmark(self, storage, checkpoint_path)
    return cast("Benchmark", result)
search_questions
search_questions(
    query: str | list[str],
    match_all: bool = True,
    fields: list[str] | None = None,
    case_sensitive: bool = False,
    regex: bool = False,
) -> list[dict[str, Any]]

Search for questions containing the query text (unified search method).

Source code in src/karenina/benchmark/benchmark.py
def search_questions(
    self,
    query: str | list[str],
    match_all: bool = True,
    fields: list[str] | None = None,
    case_sensitive: bool = False,
    regex: bool = False,
) -> list[dict[str, Any]]:
    """Search for questions containing the query text (unified search method)."""
    return self._question_manager.search_questions(query, match_all, fields, case_sensitive, regex)
set_custom_property
set_custom_property(name: str, value: Any) -> None

Set a custom property in benchmark metadata.

Source code in src/karenina/benchmark/benchmark.py
def set_custom_property(self, name: str, value: Any) -> None:
    """Set a custom property in benchmark metadata."""
    self._metadata_manager.set_custom_property(name, value)
set_global_dynamic_rubric
set_global_dynamic_rubric(
    dynamic_rubric: DynamicRubric | None,
) -> None

Set or clear the global dynamic rubric.

Persists the rubric to the checkpoint so it survives save/load cycles.

Parameters:

Name Type Description Default
dynamic_rubric DynamicRubric | None

The DynamicRubric to set, or None to clear.

required
Source code in src/karenina/benchmark/benchmark.py
def set_global_dynamic_rubric(self, dynamic_rubric: DynamicRubric | None) -> None:
    """Set or clear the global dynamic rubric.

    Persists the rubric to the checkpoint so it survives save/load cycles.

    Args:
        dynamic_rubric: The DynamicRubric to set, or None to clear.
    """
    self._base._global_dynamic_rubric = dynamic_rubric
    if dynamic_rubric is not None:
        self._rubric_manager.set_global_dynamic_rubric_in_checkpoint(dynamic_rubric)
    else:
        # Clear from checkpoint: remove dynamic rubric ratings
        if self._base._checkpoint.rating:
            self._base._checkpoint.rating = [
                r for r in self._base._checkpoint.rating if r.additionalType != "karenina:GlobalDynamicRubricTrait"
            ]
set_global_rubric
set_global_rubric(rubric: Rubric) -> None

Set the complete global rubric (replaces existing).

Source code in src/karenina/benchmark/benchmark.py
def set_global_rubric(self, rubric: Rubric) -> None:
    """Set the complete global rubric (replaces existing)."""
    self.clear_global_rubric()
    for trait in rubric.llm_traits:
        self.add_global_rubric_trait(trait)
    for regex_trait in rubric.regex_traits:
        self.add_global_rubric_trait(regex_trait)
    for callable_trait in rubric.callable_traits:
        self.add_global_rubric_trait(callable_trait)
    for metric_trait in rubric.metric_traits:
        self.add_global_rubric_trait(metric_trait)
    for agentic_trait in rubric.agentic_traits:
        self.add_global_rubric_trait(agentic_trait)
set_metadata
set_metadata(**metadata: Any) -> None

Set benchmark metadata.

Source code in src/karenina/benchmark/benchmark.py
def set_metadata(self, **metadata: Any) -> None:
    """Set benchmark metadata."""
    self._base.set_metadata(**metadata)
set_multiple_custom_properties
set_multiple_custom_properties(
    properties: dict[str, Any],
) -> None

Set multiple custom properties at once.

Source code in src/karenina/benchmark/benchmark.py
def set_multiple_custom_properties(self, properties: dict[str, Any]) -> None:
    """Set multiple custom properties at once."""
    self._metadata_manager.set_multiple_custom_properties(properties)
set_question_author
set_question_author(
    question_id: str, author: dict[str, Any] | None
) -> None

Set author information for a question.

Source code in src/karenina/benchmark/benchmark.py
def set_question_author(self, question_id: str, author: dict[str, Any] | None) -> None:
    """Set author information for a question."""
    self._question_manager.set_question_author(question_id, author)
set_question_custom_property
set_question_custom_property(
    question_id: str, name: str, value: Any
) -> None

Set a custom property on question metadata.

Source code in src/karenina/benchmark/benchmark.py
def set_question_custom_property(self, question_id: str, name: str, value: Any) -> None:
    """Set a custom property on question metadata."""
    self._question_manager.set_question_custom_property(question_id, name, value)
set_question_rubric
set_question_rubric(
    question_id: str, rubric: Rubric
) -> None

Set the complete question-specific rubric (replaces existing).

Source code in src/karenina/benchmark/benchmark.py
def set_question_rubric(self, question_id: str, rubric: Rubric) -> None:
    """Set the complete question-specific rubric (replaces existing)."""
    self.remove_question_rubric(question_id)
    for trait in rubric.llm_traits:
        self.add_question_rubric_trait(question_id, trait)
    for regex_trait in rubric.regex_traits:
        self.add_question_rubric_trait(question_id, regex_trait)
    for callable_trait in rubric.callable_traits:
        self.add_question_rubric_trait(question_id, callable_trait)
    for metric_trait in rubric.metric_traits:
        self.add_question_rubric_trait(question_id, metric_trait)
    for agentic_trait in rubric.agentic_traits:
        self.add_question_rubric_trait(question_id, agentic_trait)
set_question_sources
set_question_sources(
    question_id: str, sources: list[dict[str, Any]] | None
) -> None

Set source documents for a question.

Source code in src/karenina/benchmark/benchmark.py
def set_question_sources(self, question_id: str, sources: list[dict[str, Any]] | None) -> None:
    """Set source documents for a question."""
    self._question_manager.set_question_sources(question_id, sources)
set_workspace_root
set_workspace_root(path: Path) -> None

Set the root directory for task workspaces.

Parameters:

Name Type Description Default
path Path

Directory containing task workspace subdirectories. Question workspace paths are resolved relative to this root.

required
Source code in src/karenina/benchmark/benchmark.py
def set_workspace_root(self, path: Path) -> None:
    """Set the root directory for task workspaces.

    Args:
        path: Directory containing task workspace subdirectories.
            Question workspace paths are resolved relative to this root.
    """
    self._workspace_root = path
store_verification_results
store_verification_results(
    results: VerificationResultSet
    | dict[str, VerificationResult],
    run_name: str | None = None,
) -> None

Store verification results in the benchmark metadata.

Source code in src/karenina/benchmark/benchmark.py
def store_verification_results(
    self,
    results: VerificationResultSet | dict[str, VerificationResult],
    run_name: str | None = None,
) -> None:
    """Store verification results in the benchmark metadata."""
    _helpers.store_verification_results(self, results, run_name)
to_csv
to_csv() -> str

Export questions as CSV format.

Source code in src/karenina/benchmark/benchmark.py
def to_csv(self) -> str:
    """Export questions as CSV format."""
    return self._export_manager.to_csv()
to_dict
to_dict() -> dict[str, Any]

Export benchmark as a plain dictionary.

Source code in src/karenina/benchmark/benchmark.py
def to_dict(self) -> dict[str, Any]:
    """Export benchmark as a plain dictionary."""
    return self._export_manager.to_dict()
to_markdown
to_markdown() -> str

Export benchmark as markdown document.

Source code in src/karenina/benchmark/benchmark.py
def to_markdown(self) -> str:
    """Export benchmark as markdown document."""
    return self._export_manager.to_markdown()
toggle_finished
toggle_finished(question_id: str) -> bool

Toggle finished status of a question.

Source code in src/karenina/benchmark/benchmark.py
def toggle_finished(self, question_id: str) -> bool:
    """Toggle finished status of a question."""
    return self._question_manager.toggle_finished(question_id)
update_question_metadata
update_question_metadata(
    question_id: str, **metadata: Any
) -> None

Update question metadata fields.

Source code in src/karenina/benchmark/benchmark.py
def update_question_metadata(self, question_id: str, **metadata: Any) -> None:
    """Update question metadata fields."""
    self._question_manager.update_question_metadata(question_id, **metadata)
update_template
update_template(
    question_id: str, template_code: str | type
) -> None

Update existing template.

Parameters:

Name Type Description Default
question_id str

The question ID

required
template_code str | type

Python code defining the Answer class, or a BaseAnswer subclass

required
Source code in src/karenina/benchmark/benchmark.py
def update_template(self, question_id: str, template_code: str | type) -> None:
    """Update existing template.

    Args:
        question_id: The question ID
        template_code: Python code defining the Answer class, or a BaseAnswer subclass
    """
    self._template_manager.update_template(question_id, template_code)
validate
validate() -> tuple[bool, str]

Validate the benchmark structure and all templates.

Source code in src/karenina/benchmark/benchmark.py
def validate(self) -> tuple[bool, str]:
    """Validate the benchmark structure and all templates."""
    from .verification.utils.validation import validate_answer_template

    is_valid, error_msg = self._base.validate()
    if not is_valid:
        return False, error_msg

    for q_id, q_data in self._questions_cache.items():
        template_code = q_data.get("answer_template")
        if template_code is not None:
            is_valid, error_msg_or_none, _ = validate_answer_template(template_code)
            error_msg = error_msg_or_none or "Unknown validation error"
            if not is_valid:
                return False, f"Invalid template for {q_id}: {error_msg}"

    return True, "Benchmark is valid"
validate_rubrics
validate_rubrics() -> tuple[bool, list[str]]

Validate all rubrics are properly configured.

Source code in src/karenina/benchmark/benchmark.py
def validate_rubrics(self) -> tuple[bool, list[str]]:
    """Validate all rubrics are properly configured."""
    return self._rubric_manager.validate_rubrics()
validate_templates
validate_templates() -> tuple[bool, list[dict[str, str]]]

Validate all templates are valid Python code.

Source code in src/karenina/benchmark/benchmark.py
def validate_templates(self) -> tuple[bool, list[dict[str, str]]]:
    """Validate all templates are valid Python code."""
    return self._template_manager.validate_templates()

FinishedTemplate

Bases: BaseModel

Metadata for a finished answer template.

Source code in src/karenina/schemas/verification/api_models.py
class FinishedTemplate(BaseModel):
    """Metadata for a finished answer template."""

    model_config = ConfigDict(extra="forbid")

    question_id: str
    question_text: str
    question_preview: str  # Truncated version for UI
    raw_answer: str | None = None  # Ground truth answer from checkpoint
    template_code: str
    last_modified: str
    finished: bool = True
    question_rubric: dict[str, Any] | None = None  # Question-specific rubric as dict
    question_dynamic_rubric: dict[str, Any] | None = None  # Question-specific dynamic rubric as dict
    keywords: list[str] | None = None  # Keywords associated with the question
    few_shot_examples: list[dict[str, str]] | None = None  # Few-shot examples for this question
    workspace_path: str | None = None  # Relative workspace path from Question

ModelConfig

Bases: BaseModel

Configuration for a single model.

Source code in src/karenina/schemas/config/models.py
class ModelConfig(BaseModel):
    """Configuration for a single model."""

    model_config = ConfigDict(extra="forbid")

    id: str | None = None  # Optional - defaults to "manual" for manual interface
    model_provider: str | None = None  # Optional - only required for langchain interface
    model_name: str | None = None  # Optional - defaults to "manual" for manual interface
    temperature: float = 0.1
    max_tokens: int = 8192  # Maximum tokens for model response
    interface: str = "langchain"
    system_prompt: str | None = None  # Optional - defaults applied based on context (answering/parsing)
    max_retries: int = 2  # Optional max retries for template generation
    mcp_urls_dict: dict[str, str] | None = None  # Optional MCP server URLs
    mcp_tool_filter: list[str] | None = None  # Optional list of MCP tools to include
    mcp_tool_description_overrides: dict[str, str] | None = (
        None  # Optional tool description overrides for GEPA optimization
    )
    # OpenAI Endpoint configuration (for openai_endpoint interface)
    endpoint_base_url: str | None = None  # Custom endpoint base URL
    endpoint_api_key: SecretStr | None = None  # User-provided API key
    # Anthropic-specific configuration (for claude_tool and claude_agent_sdk interfaces)
    anthropic_base_url: str | None = None  # Custom Anthropic API endpoint (for proxies, self-hosted)
    anthropic_api_key: SecretStr | None = None  # Override ANTHROPIC_API_KEY env var
    # Extra keyword arguments to pass to the underlying model interface
    # Useful for passing vendor-specific API keys, custom parameters, etc.
    extra_kwargs: dict[str, Any] | None = None
    # Manual interface configuration
    manual_traces: Any = Field(default=None, exclude=True)  # Excluded from serialization; type: ManualTraces | None
    # Agent middleware configuration (only used when mcp_urls_dict is provided)
    # Controls retry behavior, execution limits, and summarization for MCP-enabled agents
    agent_middleware: AgentMiddlewareConfig | None = None
    # Token threshold for triggering summarization middleware.
    # When specified, summarization triggers at exactly this token count.
    # For langchain interface without this value, fraction-based triggering is used (auto-detected from model).
    # For openai_endpoint interface without this value, auto-detected from /v1/models API if available.
    # For openrouter interface without this value, defaults to 100000 * trigger_fraction.
    max_context_tokens: int | None = None
    # Timeout in seconds for agent execution. Overrides the default timeout (180s)
    # used in answer generation. Set higher for complex questions with many tool calls.
    agent_timeout: int | None = None

    @model_validator(mode="after")
    def validate_manual_interface(self) -> "ModelConfig":
        """Validate manual interface configuration and set defaults."""
        if self.interface == INTERFACE_MANUAL:
            # Reject bool values: True/False are not ManualTraces instances
            if isinstance(self.manual_traces, bool):
                raise ValueError(
                    "manual_traces must be a ManualTraces instance, not a bool. "
                    "Create a ManualTraces instance and pass it to ModelConfig."
                )
            # Manual interface requires manual_traces
            if self.manual_traces is None:
                raise ValueError(
                    "manual_traces is required when interface='manual'. "
                    "Create a ManualTraces instance and pass it to ModelConfig."
                )

            # Set defaults for manual interface
            if self.id is None:
                self.id = "manual"
            if self.model_name is None:
                self.model_name = "manual"

            # MCP not supported with manual interface
            if self.mcp_urls_dict is not None:
                raise ValueError(
                    "MCP tools are not supported with manual interface. "
                    "Manual traces are precomputed and cannot use dynamic tools."
                )
        else:
            # Non-manual interfaces require id and model_name
            if self.id is None:
                raise ValueError("id is required for non-manual interfaces")
            if self.model_name is None:
                raise ValueError("model_name is required for non-manual interfaces")

        return self

    @model_validator(mode="after")
    def validate_interface_registered(self) -> "ModelConfig":
        """Validate that the interface is registered in AdapterRegistry.

        Skips validation while the registry is initializing to avoid
        re-entrant initialization when registration modules create ModelConfig
        instances during _load_builtins().
        """
        from karenina.adapters.registry import AdapterRegistry

        # During initialization, registration modules may create ModelConfig
        # instances (e.g., in tests or default configs). Skip validation to
        # avoid re-entrant calls into _ensure_initialized() via the RLock.
        if AdapterRegistry._initializing:
            return self

        if AdapterRegistry.get_spec(self.interface) is None:
            registered = AdapterRegistry.get_interfaces()
            raise ValueError(f"Unknown interface '{self.interface}'. Registered interfaces: {sorted(registered)}")
        return self
Functions
validate_interface_registered
validate_interface_registered() -> ModelConfig

Validate that the interface is registered in AdapterRegistry.

Skips validation while the registry is initializing to avoid re-entrant initialization when registration modules create ModelConfig instances during _load_builtins().

Source code in src/karenina/schemas/config/models.py
@model_validator(mode="after")
def validate_interface_registered(self) -> "ModelConfig":
    """Validate that the interface is registered in AdapterRegistry.

    Skips validation while the registry is initializing to avoid
    re-entrant initialization when registration modules create ModelConfig
    instances during _load_builtins().
    """
    from karenina.adapters.registry import AdapterRegistry

    # During initialization, registration modules may create ModelConfig
    # instances (e.g., in tests or default configs). Skip validation to
    # avoid re-entrant calls into _ensure_initialized() via the RLock.
    if AdapterRegistry._initializing:
        return self

    if AdapterRegistry.get_spec(self.interface) is None:
        registered = AdapterRegistry.get_interfaces()
        raise ValueError(f"Unknown interface '{self.interface}'. Registered interfaces: {sorted(registered)}")
    return self
validate_manual_interface
validate_manual_interface() -> ModelConfig

Validate manual interface configuration and set defaults.

Source code in src/karenina/schemas/config/models.py
@model_validator(mode="after")
def validate_manual_interface(self) -> "ModelConfig":
    """Validate manual interface configuration and set defaults."""
    if self.interface == INTERFACE_MANUAL:
        # Reject bool values: True/False are not ManualTraces instances
        if isinstance(self.manual_traces, bool):
            raise ValueError(
                "manual_traces must be a ManualTraces instance, not a bool. "
                "Create a ManualTraces instance and pass it to ModelConfig."
            )
        # Manual interface requires manual_traces
        if self.manual_traces is None:
            raise ValueError(
                "manual_traces is required when interface='manual'. "
                "Create a ManualTraces instance and pass it to ModelConfig."
            )

        # Set defaults for manual interface
        if self.id is None:
            self.id = "manual"
        if self.model_name is None:
            self.model_name = "manual"

        # MCP not supported with manual interface
        if self.mcp_urls_dict is not None:
            raise ValueError(
                "MCP tools are not supported with manual interface. "
                "Manual traces are precomputed and cannot use dynamic tools."
            )
    else:
        # Non-manual interfaces require id and model_name
        if self.id is None:
            raise ValueError("id is required for non-manual interfaces")
        if self.model_name is None:
            raise ValueError("model_name is required for non-manual interfaces")

    return self

VerificationConfig

Bases: BaseModel

Configuration for verification run with multiple models.

Source code in src/karenina/schemas/verification/config.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
class VerificationConfig(BaseModel):
    """Configuration for verification run with multiple models."""

    model_config = ConfigDict(extra="forbid")

    answering_models: list[ModelConfig] = Field(default_factory=list)
    parsing_models: list[ModelConfig]
    replicate_count: int = Field(default=1, ge=1)  # Number of times to run each test combination

    # Parsing-only mode (for TaskEval and similar use cases)
    parsing_only: bool = False  # When True, only parsing models are required

    # Rubric evaluation settings
    rubric_trait_names: list[str] | None = None  # Optional filter for specific traits
    rubric_evaluation_strategy: Literal["batch", "sequential"] | None = "batch"
    # - "batch": Evaluate all LLM traits in a single call (efficient, requires JSON output)
    # - "sequential": Evaluate traits one-by-one (reliable, more expensive)

    # Evaluation mode: determines which stages run in the verification pipeline
    evaluation_mode: Literal["template_only", "template_and_rubric", "rubric_only"] = "template_only"
    # - "template_only": Run template verification only (default behavior)
    # - "template_and_rubric": Run both template verification AND rubric evaluation
    # - "rubric_only": Skip template verification, only evaluate rubrics on raw LLM response

    @computed_field  # type: ignore[prop-decorator]
    @property
    def rubric_enabled(self) -> bool:
        """Whether rubric evaluation is enabled. Derived from evaluation_mode."""
        return self.evaluation_mode in ("template_and_rubric", "rubric_only")

    # Trace input control: determines what portion of MCP agent trace is passed to evaluation
    use_full_trace_for_template: bool = (
        False  # If True, pass full agent trace to template parsing; if False, extract only final AI message
    )
    use_full_trace_for_rubric: bool = (
        True  # If True, pass full agent trace to rubric evaluation; if False, extract only final AI message
    )
    # Note: The full trace is ALWAYS captured and stored in raw_llm_response regardless of these settings.
    # These flags only control what input is provided to the parsing/evaluation models.
    # If False and the trace doesn't end with an AI message, verification stage will fail with error.

    # Abstention detection settings
    abstention_enabled: bool = False  # Enable abstention/refusal detection

    # Sufficiency detection settings
    sufficiency_enabled: bool = False  # Enable trace sufficiency detection

    # Extraction hint settings (controls whether hints are appended to the parsing prompt)
    include_extraction_hints: bool = True  # Include extraction hints in the parsing prompt

    # Embedding check settings (semantic similarity fallback)
    embedding_check_enabled: bool = False  # Enable semantic similarity fallback
    embedding_check_model: str = DEFAULT_EMBEDDING_MODEL  # SentenceTransformer model for embeddings
    embedding_check_threshold: float = Field(
        default=DEFAULT_EMBEDDING_THRESHOLD, ge=0.0, le=1.0
    )  # Similarity threshold (0.0-1.0)

    # Async execution settings
    async_enabled: bool = DEFAULT_ASYNC_ENABLED  # Enable parallel execution
    async_max_workers: int = Field(default=DEFAULT_ASYNC_MAX_WORKERS, ge=1)  # Number of parallel workers

    # Deep-judgment settings (multi-stage parsing with excerpts and reasoning)
    deep_judgment_enabled: bool = False  # Enable deep-judgment analysis (default: disabled)
    deep_judgment_max_excerpts_per_attribute: int = DEFAULT_DEEP_JUDGMENT_MAX_EXCERPTS  # Max excerpts per attribute
    deep_judgment_fuzzy_match_threshold: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD  # Similarity threshold
    deep_judgment_excerpt_retry_attempts: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS  # Retry attempts

    # Search-enhanced deep-judgment settings (validate excerpts against external evidence)
    deep_judgment_search_enabled: bool = False  # Enable search validation for excerpts
    deep_judgment_search_tool: str | Callable[..., Any] = "tavily"  # Search tool name or callable instance
    # Supported built-in tools: "tavily"
    # Can also pass any callable: (str | list[str]) -> (str | list[str])
    # Examples: langchain tools, MCP tools, custom functions

    # Deep-judgment rubric settings (global defaults for per-trait configuration)
    deep_judgment_rubric_max_excerpts_default: int = DEFAULT_RUBRIC_MAX_EXCERPTS  # Max excerpts per trait
    deep_judgment_rubric_fuzzy_match_threshold_default: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD  # Fuzzy match
    deep_judgment_rubric_excerpt_retry_attempts_default: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS  # Retry attempts
    deep_judgment_rubric_search_tool: str | Callable[..., Any] = (
        "tavily"  # Search tool for rubric hallucination detection
    )

    # Deep-judgment rubric configuration modes (NEW - runtime control of deep judgment)
    deep_judgment_rubric_mode: Literal["disabled", "enable_all", "use_checkpoint", "custom"] = "disabled"
    # - "disabled": Deep judgment is OFF (default, explicit)
    # - "enable_all": Apply deep judgment to all LLM traits (respects excerpt toggle)
    # - "use_checkpoint": Use deep judgment settings saved in checkpoint (if available)
    # - "custom": Use per-trait configuration from deep_judgment_rubric_config

    deep_judgment_rubric_global_excerpts: bool = True  # For enable_all mode: enable/disable excerpts globally
    deep_judgment_rubric_config: dict[str, Any] | None = None  # For custom mode: nested trait config
    # Expected structure for custom mode:
    # {
    #   "global": {
    #     "TraitName": {"enabled": True, "excerpt_enabled": True, ...}
    #   },
    #   "question_specific": {
    #     "question-id": {
    #       "TraitName": {"enabled": True, ...}
    #     }
    #   }
    # }

    # Few-shot prompting settings
    few_shot_config: FewShotConfig | None = None  # New flexible configuration

    # Per-task-type prompt instructions (optional user-injected instructions for each pipeline stage)
    prompt_config: PromptConfig | None = None

    # Agentic parsing
    agentic_parsing: bool = Field(
        default=False,
        description=(
            "Enable agentic parsing (Stage 7b). The judge uses tools to "
            "independently verify artifacts before extracting structured data."
        ),
    )
    agentic_judge_context: Literal["workspace_only", "trace_and_workspace", "trace_only"] = Field(
        default="workspace_only",
        description=(
            "What context the investigation agent receives. "
            "'workspace_only': question + workspace path (maximum independence). "
            "'trace_and_workspace': answering agent trace + workspace path. "
            "'trace_only': equivalent to classical Stage 7a parsing."
        ),
    )
    agentic_parsing_max_turns: int = Field(
        default=15,
        ge=1,
        description="Max turns for the investigation agent.",
    )
    agentic_parsing_timeout: float = Field(
        default=120.0,
        ge=0.0,
        description="Timeout in seconds for the investigation agent.",
    )

    # Agentic rubric evaluation
    agentic_rubric_strategy: Literal["individual", "shared"] = Field(
        "individual",
        description="How to evaluate agentic rubric traits. "
        "'individual': one agent session per trait (robust, isolated). "
        "'shared': one agent session for all traits (efficient, shared context).",
    )
    agentic_rubric_parallel: bool = Field(
        False,
        description="Enable parallel evaluation of agentic rubric traits. "
        "Only applies to 'individual' strategy. Each trait gets a concurrent agent session.",
    )

    # Workspace (workspace_root lives on Benchmark, not here)
    workspace_copy: bool = Field(
        default=True,
        description=(
            "When True, pre-existing question workspaces are copied to a "
            "sibling working directory before execution, protecting the "
            "original for re-runs. When False, the pipeline works directly "
            "in the original directory (destructive)."
        ),
    )
    workspace_cleanup: bool = Field(
        default=True,
        description=(
            "Whether to delete working copies after the run. Only applies to "
            "copied or auto-created workspaces, never to original source "
            "directories."
        ),
    )

    # Database storage settings
    db_config: Any | None = None  # DBConfig instance for automatic result persistence

    # Scenario execution settings
    scenario_turn_limit: int = Field(default=20, ge=1)  # Max turns before forced termination in scenario execution

    @field_validator("db_config", mode="before")
    @classmethod
    def _validate_db_config(cls, v: Any) -> Any:
        """Validate that db_config is a DBConfig instance or None.

        Uses runtime import to avoid circular dependency with karenina.storage.

        Raises:
            TypeError: If value is not None and not a DBConfig instance.
        """
        if v is None:
            return v
        from karenina.storage.db_config import DBConfig

        if not isinstance(v, DBConfig):
            raise TypeError(f"db_config must be a DBConfig instance or None, got {type(v).__name__}")
        return v

    def __init__(self, **data: Any) -> None:
        """
        Initialize with environment variable support and default system prompts.

        Configuration precedence (highest to lowest):
        1. Explicit arguments (including preset values)
        2. Environment variables (only if set)
        3. Field defaults
        """
        # Read environment variables for embedding check settings (only if not explicitly provided AND env var is set)
        if "embedding_check_enabled" not in data:
            env_val = os.getenv("EMBEDDING_CHECK")
            if env_val is not None:
                data["embedding_check_enabled"] = env_val.lower() in ("true", "1", "yes")
            # else: let Pydantic use field default (False)

        if "embedding_check_model" not in data:
            env_val = os.getenv("EMBEDDING_CHECK_MODEL")
            if env_val is not None:
                data["embedding_check_model"] = env_val
            # else: let Pydantic use field default (DEFAULT_EMBEDDING_MODEL)

        if "embedding_check_threshold" not in data:
            env_val = os.getenv("EMBEDDING_CHECK_THRESHOLD")
            if env_val is not None:
                # Invalid env var value will let Pydantic use field default (0.85)
                with contextlib.suppress(ValueError):
                    data["embedding_check_threshold"] = float(env_val)
            # else: let Pydantic use field default (DEFAULT_EMBEDDING_THRESHOLD)

        # Read environment variables for async execution settings (only if not explicitly provided AND env var is set)
        if "async_enabled" not in data:
            env_val = os.getenv("KARENINA_ASYNC_ENABLED")
            if env_val is not None:
                data["async_enabled"] = env_val.lower() in ("true", "1", "yes")
            # else: let Pydantic use field default (DEFAULT_ASYNC_ENABLED)

        if "async_max_workers" not in data:
            env_val = os.getenv("KARENINA_ASYNC_MAX_WORKERS")
            if env_val is not None:
                # Invalid env var value will let Pydantic use field default (2)
                with contextlib.suppress(ValueError):
                    data["async_max_workers"] = int(env_val)
            # else: let Pydantic use field default (DEFAULT_ASYNC_MAX_WORKERS)

        # Apply default system prompts to models that don't have one.
        # Deep-copy ModelConfig instances to avoid mutating shared objects.
        if "answering_models" in data:
            data["answering_models"] = [
                m.model_copy(update={"system_prompt": DEFAULT_ANSWERING_SYSTEM_PROMPT})
                if isinstance(m, ModelConfig) and not m.system_prompt
                else (
                    {**m, "system_prompt": DEFAULT_ANSWERING_SYSTEM_PROMPT}
                    if isinstance(m, dict) and not m.get("system_prompt")
                    else m
                )
                for m in data["answering_models"]
            ]

        if "parsing_models" in data:
            data["parsing_models"] = [
                m.model_copy(update={"system_prompt": DEFAULT_PARSING_SYSTEM_PROMPT})
                if isinstance(m, ModelConfig) and not m.system_prompt
                else (
                    {**m, "system_prompt": DEFAULT_PARSING_SYSTEM_PROMPT}
                    if isinstance(m, dict) and not m.get("system_prompt")
                    else m
                )
                for m in data["parsing_models"]
            ]

        # Strip rubric_enabled from input: now derived from evaluation_mode
        data.pop("rubric_enabled", None)

        # Strip deep_judgment_rubric_search_enabled: not a declared field,
        # but injected by from_overrides() and some CLI callers.
        data.pop("deep_judgment_rubric_search_enabled", None)

        super().__init__(**data)

        # Validate configuration after initialization
        self._validate_config()

    def _validate_config(self) -> None:
        """
        Validate configuration, especially for rubric-enabled scenarios.

        Validates that:
        - At least one parsing model is configured
        - At least one answering model is configured (unless parsing_only=True)
        - Required fields are present for each model
        - Model provider is provided for interfaces that require it
        - Rubric-specific requirements are met when enabled

        Raises:
            ValueError: If any validation rule fails
        """
        # Check that we have at least one parsing model (always required)
        if not self.parsing_models:
            raise ValueError("At least one parsing model must be configured")

        # Check answering models only if not in parsing-only mode
        if not self.parsing_only and not self.answering_models:
            raise ValueError("At least one answering model must be configured (unless parsing_only=True)")

        # Validate model configurations
        # Note: Basic model validation (model_name, model_provider) is also done by
        # the adapter factory at runtime, but we validate here too for early failure.
        for model in self.answering_models + self.parsing_models:
            if not model.model_name:
                raise ValueError(f"Model name is required in model configuration (model: {model.id})")
            # Model provider requirement is defined per-adapter via AdapterSpec.requires_provider
            from karenina.adapters.registry import AdapterRegistry

            spec = AdapterRegistry.get_spec(model.interface)
            if spec is not None and spec.requires_provider and not model.model_provider:
                raise ValueError(f"Model provider is required for interface '{model.interface}'. (model: {model.id})")
            # System prompt is required for verification (not validated by factory)
            if not model.system_prompt:
                raise ValueError(f"System prompt is required for model {model.id}")

        # Additional validation for rubric-enabled scenarios
        if self.rubric_enabled and not self.parsing_models:
            raise ValueError("Parsing models are required when rubric evaluation is enabled")

        # Additional validation for few-shot prompting scenarios
        if self.few_shot_config is not None and self.few_shot_config.enabled:
            if self.few_shot_config.global_mode == "k-shot" and self.few_shot_config.global_k < 1:
                raise ValueError("Global few-shot k value must be at least 1 when using k-shot mode")

            # Validate question-specific k values
            for question_id, question_config in self.few_shot_config.question_configs.items():
                if question_config.mode == "k-shot" and question_config.k is not None and question_config.k < 1:
                    raise ValueError(
                        f"Question {question_id} few-shot k value must be at least 1 when using k-shot mode"
                    )

        # Additional validation for search-enhanced deep-judgment
        if self.deep_judgment_search_enabled:
            # Validate search tool
            if isinstance(self.deep_judgment_search_tool, str):
                # Check if it's a supported built-in tool
                supported_tools = ["tavily"]
                if self.deep_judgment_search_tool.lower() not in supported_tools:
                    raise ValueError(
                        f"Unknown search tool: '{self.deep_judgment_search_tool}'. Supported tools: {supported_tools}"
                    )
            elif not callable(self.deep_judgment_search_tool):
                raise ValueError(
                    "Search tool must be either a supported tool name string "
                    "or a callable with signature (str | list[str]) -> (str | list[str])"
                )

        # Agentic parsing validation
        if self.agentic_parsing:
            # Check parsing model interface supports AgentPort
            from karenina.adapters.registry import AdapterRegistry

            for pm in self.parsing_models:
                spec = AdapterRegistry.get_spec(pm.interface)
                if spec is None or spec.agent_tier != "deep_agent":
                    tier = spec.agent_tier if spec else "unknown"
                    raise ValueError(
                        f"agentic_parsing=True requires an interface with "
                        f"agent_tier='deep_agent', but '{pm.interface}' has "
                        f"agent_tier='{tier}'. Use 'claude_agent_sdk' or "
                        f"'langchain_deep_agents' instead."
                    )

            # Agentic parsing is not supported in rubric_only mode
            if self.evaluation_mode == "rubric_only":
                raise ValueError(
                    "agentic_parsing=True is not supported with "
                    "evaluation_mode='rubric_only'. Use 'template_only' or "
                    "'template_and_rubric'."
                )

            # Warn about trace_only being equivalent to Stage 7a
            if self.agentic_judge_context == "trace_only":
                logger.warning(
                    "agentic_parsing=True with agentic_judge_context='trace_only' "
                    "is equivalent to classical parsing (Stage 7a)."
                )

    def __repr__(self) -> str:
        """
        Return detailed string representation for debugging/inspection.

        Shows key configuration settings including models, execution parameters,
        and enabled features in a human-readable multi-line format.
        """
        lines = ["VerificationConfig("]

        # === MODELS ===
        lines.append("  === MODELS ===")

        # Answering models
        if self.answering_models:
            lines.append(f"  Answering ({len(self.answering_models)}):")
            for model in self.answering_models:
                provider = model.model_provider or "none"
                lines.append(
                    f"    - {model.model_name} ({provider}) [temp={model.temperature}, interface={model.interface}]"
                )
        else:
            lines.append("  Answering: none")

        # Parsing models
        lines.append(f"  Parsing ({len(self.parsing_models)}):")
        for model in self.parsing_models:
            provider = model.model_provider or "none"
            lines.append(
                f"    - {model.model_name} ({provider}) [temp={model.temperature}, interface={model.interface}]"
            )

        # === EXECUTION ===
        lines.append("")
        lines.append("  === EXECUTION ===")
        lines.append(f"  Replicates: {self.replicate_count}")
        lines.append(f"  Async: {self.async_enabled}")
        if self.async_enabled:
            lines.append(f"    └─ workers: {self.async_max_workers}")
        if self.parsing_only:
            lines.append("  Parsing Only: True")
        lines.append(f"  Evaluation Mode: {self.evaluation_mode}")
        lines.append(f"  Rubric Evaluation Strategy: {self.rubric_evaluation_strategy}")

        # === FEATURES ===
        lines.append("")
        lines.append("  === FEATURES ===")
        features_shown = False

        # Rubric - just enabled/disabled status with optional trait selection
        if self.rubric_enabled:
            features_shown = True
            trait_info = ""
            if self.rubric_trait_names:
                trait_info = f" ({len(self.rubric_trait_names)} traits selected)"
            lines.append(f"  Rubric: enabled{trait_info}")
        else:
            lines.append("  Rubric: disabled")

        # Deep Judgment - Template
        if self.deep_judgment_enabled:
            features_shown = True
            lines.append(
                f"  Deep Judgment (Template): "
                f"max_excerpts={self.deep_judgment_max_excerpts_per_attribute}, "
                f"fuzzy_threshold={self.deep_judgment_fuzzy_match_threshold}"
            )
            if self.deep_judgment_search_enabled:
                search_tool = self.deep_judgment_search_tool
                if callable(search_tool):
                    search_tool = "<custom_callable>"
                lines.append(f"    └─ search: {search_tool}")

        # Deep Judgment - Rubric
        if self.deep_judgment_rubric_mode != "disabled":
            features_shown = True
            lines.append(
                f"  Deep Judgment (Rubric): mode={self.deep_judgment_rubric_mode}, "
                f"global_excerpts={self.deep_judgment_rubric_global_excerpts}"
            )
            # Warning about sequential evaluation
            lines.append("    ⚠️  Deep judgment traits are ALWAYS evaluated sequentially (one-by-one)")
            if self.deep_judgment_rubric_mode == "custom" and self.deep_judgment_rubric_config:
                global_traits = self.deep_judgment_rubric_config.get("global", {})
                question_configs = self.deep_judgment_rubric_config.get("question_specific", {})
                lines.append(f"    └─ {len(global_traits)} global traits, {len(question_configs)} question configs")

        # Abstention
        if self.abstention_enabled:
            features_shown = True
            lines.append("  Abstention: enabled")

        # Sufficiency
        if self.sufficiency_enabled:
            features_shown = True
            lines.append("  Sufficiency: enabled")

        # Embedding Check
        if self.embedding_check_enabled:
            features_shown = True
            lines.append(
                f"  Embedding Check: model={self.embedding_check_model}, threshold={self.embedding_check_threshold}"
            )

        # Few-Shot
        few_shot_config = self.get_few_shot_config()
        if few_shot_config and few_shot_config.enabled:
            features_shown = True
            lines.append(f"  Few-Shot: mode={few_shot_config.global_mode}")
            if few_shot_config.global_mode == "k-shot":
                lines.append(f"    └─ k={few_shot_config.global_k}")
            if few_shot_config.question_configs:
                lines.append(f"    └─ {len(few_shot_config.question_configs)} question configs")

        if not features_shown:
            lines.append("  (none enabled)")

        lines.append(")")

        return "\n".join(lines)

    def __str__(self) -> str:
        """String representation (same as repr for developer-friendly output)."""
        return self.__repr__()

    def get_few_shot_config(self) -> FewShotConfig | None:
        """
        Get the FewShotConfig for this verification run.

        Returns:
            The FewShotConfig to use, or None if few-shot is disabled
        """
        return self.few_shot_config

    def is_few_shot_enabled(self) -> bool:
        """
        Check if few-shot prompting is enabled.

        Returns:
            True if few-shot is enabled
        """
        config = self.get_few_shot_config()
        return config is not None and config.enabled

    # ===== Preset Utility Class Methods =====
    # These methods delegate to config_presets module for backward compatibility.

    @classmethod
    def sanitize_model_config(cls, model: dict[str, Any]) -> dict[str, Any]:
        """Sanitize model configuration. Delegates to config_presets.sanitize_model_config."""
        return sanitize_model_config(model)

    @classmethod
    def sanitize_preset_name(cls, name: str) -> str:
        """Convert preset name to safe filename. Delegates to config_presets.sanitize_preset_name."""
        return sanitize_preset_name(name)

    @classmethod
    def validate_preset_metadata(cls, name: str, description: str | None = None) -> None:
        """Validate preset metadata. Delegates to config_presets.validate_preset_metadata."""
        return validate_preset_metadata(name, description)

    @classmethod
    def create_preset_structure(
        cls,
        preset_id: str,
        name: str,
        description: str | None,
        config_dict: dict[str, Any],
        created_at: str,
        updated_at: str,
    ) -> dict[str, Any]:
        """Create preset structure. Delegates to config_presets.create_preset_structure."""
        return create_preset_structure(preset_id, name, description, config_dict, created_at, updated_at)

    def save_preset(
        self,
        name: str,
        description: str | None = None,
        presets_dir: Path | None = None,
    ) -> dict[str, Any]:
        """Save this config as a preset file. Delegates to config_presets.save_preset."""
        return save_preset(self, name, description, presets_dir)

    @classmethod
    def from_preset(cls, filepath: Path) -> "VerificationConfig":
        """Load a VerificationConfig from a preset file. Delegates to config_presets.load_preset."""
        return load_preset(filepath)

    @classmethod
    def from_overrides(
        cls,
        base: "VerificationConfig | None" = None,
        *,
        # Model configuration
        answering_model: str | None = None,
        answering_provider: str | None = None,
        answering_id: str | None = None,
        answering_interface: str | None = None,
        parsing_model: str | None = None,
        parsing_provider: str | None = None,
        parsing_id: str | None = None,
        parsing_interface: str | None = None,
        temperature: float | None = None,
        manual_traces: Any | None = None,
        # Execution settings
        replicate_count: int | None = None,
        # Feature flags
        abstention: bool | None = None,
        sufficiency: bool | None = None,
        embedding_check: bool | None = None,
        deep_judgment: bool | None = None,
        # Evaluation settings
        evaluation_mode: str | None = None,
        embedding_threshold: float | None = None,
        embedding_model: str | None = None,
        async_execution: bool | None = None,
        async_workers: int | None = None,
        # Trace filtering
        use_full_trace_for_template: bool | None = None,
        use_full_trace_for_rubric: bool | None = None,
        # Deep judgment rubric settings
        deep_judgment_rubric_mode: str | None = None,
        deep_judgment_rubric_excerpts: bool | None = None,
        deep_judgment_rubric_max_excerpts: int | None = None,
        deep_judgment_rubric_fuzzy_threshold: float | None = None,
        deep_judgment_rubric_retry_attempts: int | None = None,
        deep_judgment_rubric_search: bool | None = None,
        deep_judgment_rubric_search_tool: str | None = None,
        deep_judgment_rubric_config: dict[str, Any] | None = None,
    ) -> "VerificationConfig":
        """
        Create a VerificationConfig by applying overrides to an optional base config.

        Implements the hierarchy: overrides > base config > defaults.
        Parameters set to None are not applied (base or default value is preserved).

        This is the canonical way to construct a VerificationConfig with selective
        overrides, usable by CLI, server, and programmatic callers.

        Args:
            base: Optional base config (e.g., from a preset). If None, starts from defaults.
            answering_model: Override for the answering model name.
            answering_provider: Override for the answering model provider.
            answering_id: Override for the answering model identifier.
            answering_interface: Override for the answering adapter interface.
            parsing_model: Override for the parsing model name.
            parsing_provider: Override for the parsing model provider.
            parsing_id: Override for the parsing model identifier.
            parsing_interface: Override for the parsing adapter interface.
            temperature: Override for the LLM temperature.
            manual_traces: Override for manual traces data.
            replicate_count: Override for the number of replicates.
            abstention: Override for abstention detection flag.
            sufficiency: Override for sufficiency checking flag.
            embedding_check: Override for embedding check flag.
            deep_judgment: Override for deep judgment flag.
            evaluation_mode: Override for evaluation mode.
            embedding_threshold: Override for embedding similarity threshold.
            embedding_model: Override for embedding model name.
            async_execution: Override for async execution flag.
            async_workers: Override for number of async workers.
            use_full_trace_for_template: Override for full trace template flag.
            use_full_trace_for_rubric: Override for full trace rubric flag.
            deep_judgment_rubric_mode: Override for deep judgment rubric mode.
            deep_judgment_rubric_excerpts: Override for rubric excerpts flag.
            deep_judgment_rubric_max_excerpts: Override for max rubric excerpts.
            deep_judgment_rubric_fuzzy_threshold: Override for rubric fuzzy threshold.
            deep_judgment_rubric_retry_attempts: Override for rubric retry attempts.
            deep_judgment_rubric_search: Override for rubric search flag.
            deep_judgment_rubric_search_tool: Override for rubric search tool.
            deep_judgment_rubric_config: Override for rubric config dict.

        Returns:
            A new VerificationConfig with overrides applied.
        """
        # Start with base config dump or empty dict
        config_dict: dict[str, Any] = base.model_dump() if base else {}

        # --- Scalar overrides (None = don't override) ---

        # Replicate count
        if replicate_count is not None:
            config_dict["replicate_count"] = replicate_count
        elif not base:
            config_dict["replicate_count"] = 1

        # Feature flags
        if abstention is not None:
            config_dict["abstention_enabled"] = abstention
        if sufficiency is not None:
            config_dict["sufficiency_enabled"] = sufficiency
        if embedding_check is not None:
            config_dict["embedding_check_enabled"] = embedding_check
        if deep_judgment is not None:
            config_dict["deep_judgment_enabled"] = deep_judgment

        # Evaluation settings
        if evaluation_mode is not None:
            config_dict["evaluation_mode"] = evaluation_mode
        if embedding_threshold is not None:
            config_dict["embedding_check_threshold"] = embedding_threshold
        if embedding_model is not None:
            config_dict["embedding_check_model"] = embedding_model
        if async_execution is not None:
            config_dict["async_enabled"] = async_execution
        if async_workers is not None:
            config_dict["async_max_workers"] = async_workers

        # Trace filtering
        if use_full_trace_for_template is not None:
            config_dict["use_full_trace_for_template"] = use_full_trace_for_template
        if use_full_trace_for_rubric is not None:
            config_dict["use_full_trace_for_rubric"] = use_full_trace_for_rubric

        # Deep judgment rubric settings
        if deep_judgment_rubric_mode is not None:
            config_dict["deep_judgment_rubric_mode"] = deep_judgment_rubric_mode
        if deep_judgment_rubric_excerpts is not None:
            config_dict["deep_judgment_rubric_global_excerpts"] = deep_judgment_rubric_excerpts
        if deep_judgment_rubric_max_excerpts is not None:
            config_dict["deep_judgment_rubric_max_excerpts_default"] = deep_judgment_rubric_max_excerpts
        if deep_judgment_rubric_fuzzy_threshold is not None:
            config_dict["deep_judgment_rubric_fuzzy_match_threshold_default"] = deep_judgment_rubric_fuzzy_threshold
        if deep_judgment_rubric_retry_attempts is not None:
            config_dict["deep_judgment_rubric_excerpt_retry_attempts_default"] = deep_judgment_rubric_retry_attempts
        if deep_judgment_rubric_search is not None:
            config_dict["deep_judgment_rubric_search_enabled"] = deep_judgment_rubric_search
        if deep_judgment_rubric_search_tool is not None:
            config_dict["deep_judgment_rubric_search_tool"] = deep_judgment_rubric_search_tool
        if deep_judgment_rubric_config is not None:
            config_dict["deep_judgment_rubric_config"] = deep_judgment_rubric_config

        # --- Model configuration ---
        # Determine the unified interface (answering and parsing may differ)
        ans_interface = answering_interface
        par_interface = parsing_interface
        # If only a single 'interface' concept was provided via answering_interface,
        # it's already split by the caller. No implicit sharing here.

        answering_has_overrides = any(
            [
                answering_model is not None,
                answering_provider is not None,
                ans_interface is not None,
            ]
        )

        parsing_has_overrides = any(
            [
                parsing_model is not None,
                parsing_provider is not None,
                par_interface is not None,
            ]
        )

        if answering_has_overrides:
            config_dict["answering_models"] = [
                cls._build_model_config_dict(
                    base_models=base.answering_models if base else None,
                    model_name=answering_model,
                    provider=answering_provider,
                    model_id=answering_id,
                    temperature=temperature,
                    interface=ans_interface,
                    manual_traces=manual_traces,
                    default_model="gpt-4.1-mini",
                    default_provider="openai",
                    default_interface="langchain",
                )
            ]
        elif manual_traces is not None:
            # Manual interface requested via manual_traces without explicit model overrides
            config_dict["answering_models"] = [ModelConfig(interface="manual", manual_traces=manual_traces)]

        if parsing_has_overrides:
            config_dict["parsing_models"] = [
                cls._build_model_config_dict(
                    base_models=base.parsing_models if base else None,
                    model_name=parsing_model,
                    provider=parsing_provider,
                    model_id=parsing_id,
                    temperature=temperature,
                    interface=par_interface,
                    manual_traces=None,  # Parsing model never uses manual interface
                    default_model="gpt-4.1-mini",
                    default_provider="openai",
                    default_interface="langchain",
                )
            ]

        return cls(**config_dict)

    @classmethod
    def _build_model_config_dict(
        cls,
        *,
        base_models: list[ModelConfig] | None,
        model_name: str | None,
        provider: str | None,
        model_id: str | None,
        temperature: float | None,
        interface: str | None,
        manual_traces: Any | None,
        default_model: str,
        default_provider: str,
        default_interface: str,
    ) -> ModelConfig:
        """
        Build a ModelConfig by applying overrides to an optional base model.

        If base_models is provided, uses the first model as the starting point and
        applies only non-None overrides. If no base, constructs from scratch with defaults.

        Returns:
            A new ModelConfig instance.
        """
        if interface == "manual" and manual_traces is not None:
            return ModelConfig(interface="manual", manual_traces=manual_traces)

        if base_models:
            # Start from base model, apply overrides
            base_model = base_models[0].model_dump()
            if model_name is not None:
                base_model["model_name"] = model_name
            if provider is not None:
                base_model["model_provider"] = provider
            if model_id is not None:
                base_model["id"] = model_id
            if temperature is not None:
                base_model["temperature"] = temperature
            if interface is not None:
                base_model["interface"] = interface
            return ModelConfig(**base_model)

        # No base: build from scratch
        final_interface = interface or default_interface

        return ModelConfig(
            model_name=model_name or default_model,
            model_provider=provider or default_provider,
            interface=final_interface,
            temperature=temperature if temperature is not None else 0.1,
            id=model_id,
        )
Attributes
rubric_enabled property
rubric_enabled: bool

Whether rubric evaluation is enabled. Derived from evaluation_mode.

Functions
__init__
__init__(**data: Any) -> None

Configuration precedence (highest to lowest): 1. Explicit arguments (including preset values) 2. Environment variables (only if set) 3. Field defaults

Source code in src/karenina/schemas/verification/config.py
def __init__(self, **data: Any) -> None:
    """
    Initialize with environment variable support and default system prompts.

    Configuration precedence (highest to lowest):
    1. Explicit arguments (including preset values)
    2. Environment variables (only if set)
    3. Field defaults
    """
    # Read environment variables for embedding check settings (only if not explicitly provided AND env var is set)
    if "embedding_check_enabled" not in data:
        env_val = os.getenv("EMBEDDING_CHECK")
        if env_val is not None:
            data["embedding_check_enabled"] = env_val.lower() in ("true", "1", "yes")
        # else: let Pydantic use field default (False)

    if "embedding_check_model" not in data:
        env_val = os.getenv("EMBEDDING_CHECK_MODEL")
        if env_val is not None:
            data["embedding_check_model"] = env_val
        # else: let Pydantic use field default (DEFAULT_EMBEDDING_MODEL)

    if "embedding_check_threshold" not in data:
        env_val = os.getenv("EMBEDDING_CHECK_THRESHOLD")
        if env_val is not None:
            # Invalid env var value will let Pydantic use field default (0.85)
            with contextlib.suppress(ValueError):
                data["embedding_check_threshold"] = float(env_val)
        # else: let Pydantic use field default (DEFAULT_EMBEDDING_THRESHOLD)

    # Read environment variables for async execution settings (only if not explicitly provided AND env var is set)
    if "async_enabled" not in data:
        env_val = os.getenv("KARENINA_ASYNC_ENABLED")
        if env_val is not None:
            data["async_enabled"] = env_val.lower() in ("true", "1", "yes")
        # else: let Pydantic use field default (DEFAULT_ASYNC_ENABLED)

    if "async_max_workers" not in data:
        env_val = os.getenv("KARENINA_ASYNC_MAX_WORKERS")
        if env_val is not None:
            # Invalid env var value will let Pydantic use field default (2)
            with contextlib.suppress(ValueError):
                data["async_max_workers"] = int(env_val)
        # else: let Pydantic use field default (DEFAULT_ASYNC_MAX_WORKERS)

    # Apply default system prompts to models that don't have one.
    # Deep-copy ModelConfig instances to avoid mutating shared objects.
    if "answering_models" in data:
        data["answering_models"] = [
            m.model_copy(update={"system_prompt": DEFAULT_ANSWERING_SYSTEM_PROMPT})
            if isinstance(m, ModelConfig) and not m.system_prompt
            else (
                {**m, "system_prompt": DEFAULT_ANSWERING_SYSTEM_PROMPT}
                if isinstance(m, dict) and not m.get("system_prompt")
                else m
            )
            for m in data["answering_models"]
        ]

    if "parsing_models" in data:
        data["parsing_models"] = [
            m.model_copy(update={"system_prompt": DEFAULT_PARSING_SYSTEM_PROMPT})
            if isinstance(m, ModelConfig) and not m.system_prompt
            else (
                {**m, "system_prompt": DEFAULT_PARSING_SYSTEM_PROMPT}
                if isinstance(m, dict) and not m.get("system_prompt")
                else m
            )
            for m in data["parsing_models"]
        ]

    # Strip rubric_enabled from input: now derived from evaluation_mode
    data.pop("rubric_enabled", None)

    # Strip deep_judgment_rubric_search_enabled: not a declared field,
    # but injected by from_overrides() and some CLI callers.
    data.pop("deep_judgment_rubric_search_enabled", None)

    super().__init__(**data)

    # Validate configuration after initialization
    self._validate_config()
create_preset_structure classmethod
create_preset_structure(
    preset_id: str,
    name: str,
    description: str | None,
    config_dict: dict[str, Any],
    created_at: str,
    updated_at: str,
) -> dict[str, Any]

Create preset structure. Delegates to config_presets.create_preset_structure.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def create_preset_structure(
    cls,
    preset_id: str,
    name: str,
    description: str | None,
    config_dict: dict[str, Any],
    created_at: str,
    updated_at: str,
) -> dict[str, Any]:
    """Create preset structure. Delegates to config_presets.create_preset_structure."""
    return create_preset_structure(preset_id, name, description, config_dict, created_at, updated_at)
from_overrides classmethod
from_overrides(
    base: VerificationConfig | None = None,
    *,
    answering_model: str | None = None,
    answering_provider: str | None = None,
    answering_id: str | None = None,
    answering_interface: str | None = None,
    parsing_model: str | None = None,
    parsing_provider: str | None = None,
    parsing_id: str | None = None,
    parsing_interface: str | None = None,
    temperature: float | None = None,
    manual_traces: Any | None = None,
    replicate_count: int | None = None,
    abstention: bool | None = None,
    sufficiency: bool | None = None,
    embedding_check: bool | None = None,
    deep_judgment: bool | None = None,
    evaluation_mode: str | None = None,
    embedding_threshold: float | None = None,
    embedding_model: str | None = None,
    async_execution: bool | None = None,
    async_workers: int | None = None,
    use_full_trace_for_template: bool | None = None,
    use_full_trace_for_rubric: bool | None = None,
    deep_judgment_rubric_mode: str | None = None,
    deep_judgment_rubric_excerpts: bool | None = None,
    deep_judgment_rubric_max_excerpts: int | None = None,
    deep_judgment_rubric_fuzzy_threshold: float
    | None = None,
    deep_judgment_rubric_retry_attempts: int | None = None,
    deep_judgment_rubric_search: bool | None = None,
    deep_judgment_rubric_search_tool: str | None = None,
    deep_judgment_rubric_config: dict[str, Any]
    | None = None,
) -> VerificationConfig

Create a VerificationConfig by applying overrides to an optional base config.

Implements the hierarchy: overrides > base config > defaults. Parameters set to None are not applied (base or default value is preserved).

This is the canonical way to construct a VerificationConfig with selective overrides, usable by CLI, server, and programmatic callers.

Parameters:

Name Type Description Default
base VerificationConfig | None

Optional base config (e.g., from a preset). If None, starts from defaults.

None
answering_model str | None

Override for the answering model name.

None
answering_provider str | None

Override for the answering model provider.

None
answering_id str | None

Override for the answering model identifier.

None
answering_interface str | None

Override for the answering adapter interface.

None
parsing_model str | None

Override for the parsing model name.

None
parsing_provider str | None

Override for the parsing model provider.

None
parsing_id str | None

Override for the parsing model identifier.

None
parsing_interface str | None

Override for the parsing adapter interface.

None
temperature float | None

Override for the LLM temperature.

None
manual_traces Any | None

Override for manual traces data.

None
replicate_count int | None

Override for the number of replicates.

None
abstention bool | None

Override for abstention detection flag.

None
sufficiency bool | None

Override for sufficiency checking flag.

None
embedding_check bool | None

Override for embedding check flag.

None
deep_judgment bool | None

Override for deep judgment flag.

None
evaluation_mode str | None

Override for evaluation mode.

None
embedding_threshold float | None

Override for embedding similarity threshold.

None
embedding_model str | None

Override for embedding model name.

None
async_execution bool | None

Override for async execution flag.

None
async_workers int | None

Override for number of async workers.

None
use_full_trace_for_template bool | None

Override for full trace template flag.

None
use_full_trace_for_rubric bool | None

Override for full trace rubric flag.

None
deep_judgment_rubric_mode str | None

Override for deep judgment rubric mode.

None
deep_judgment_rubric_excerpts bool | None

Override for rubric excerpts flag.

None
deep_judgment_rubric_max_excerpts int | None

Override for max rubric excerpts.

None
deep_judgment_rubric_fuzzy_threshold float | None

Override for rubric fuzzy threshold.

None
deep_judgment_rubric_retry_attempts int | None

Override for rubric retry attempts.

None
deep_judgment_rubric_search bool | None

Override for rubric search flag.

None
deep_judgment_rubric_search_tool str | None

Override for rubric search tool.

None
deep_judgment_rubric_config dict[str, Any] | None

Override for rubric config dict.

None

Returns:

Type Description
VerificationConfig

A new VerificationConfig with overrides applied.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def from_overrides(
    cls,
    base: "VerificationConfig | None" = None,
    *,
    # Model configuration
    answering_model: str | None = None,
    answering_provider: str | None = None,
    answering_id: str | None = None,
    answering_interface: str | None = None,
    parsing_model: str | None = None,
    parsing_provider: str | None = None,
    parsing_id: str | None = None,
    parsing_interface: str | None = None,
    temperature: float | None = None,
    manual_traces: Any | None = None,
    # Execution settings
    replicate_count: int | None = None,
    # Feature flags
    abstention: bool | None = None,
    sufficiency: bool | None = None,
    embedding_check: bool | None = None,
    deep_judgment: bool | None = None,
    # Evaluation settings
    evaluation_mode: str | None = None,
    embedding_threshold: float | None = None,
    embedding_model: str | None = None,
    async_execution: bool | None = None,
    async_workers: int | None = None,
    # Trace filtering
    use_full_trace_for_template: bool | None = None,
    use_full_trace_for_rubric: bool | None = None,
    # Deep judgment rubric settings
    deep_judgment_rubric_mode: str | None = None,
    deep_judgment_rubric_excerpts: bool | None = None,
    deep_judgment_rubric_max_excerpts: int | None = None,
    deep_judgment_rubric_fuzzy_threshold: float | None = None,
    deep_judgment_rubric_retry_attempts: int | None = None,
    deep_judgment_rubric_search: bool | None = None,
    deep_judgment_rubric_search_tool: str | None = None,
    deep_judgment_rubric_config: dict[str, Any] | None = None,
) -> "VerificationConfig":
    """
    Create a VerificationConfig by applying overrides to an optional base config.

    Implements the hierarchy: overrides > base config > defaults.
    Parameters set to None are not applied (base or default value is preserved).

    This is the canonical way to construct a VerificationConfig with selective
    overrides, usable by CLI, server, and programmatic callers.

    Args:
        base: Optional base config (e.g., from a preset). If None, starts from defaults.
        answering_model: Override for the answering model name.
        answering_provider: Override for the answering model provider.
        answering_id: Override for the answering model identifier.
        answering_interface: Override for the answering adapter interface.
        parsing_model: Override for the parsing model name.
        parsing_provider: Override for the parsing model provider.
        parsing_id: Override for the parsing model identifier.
        parsing_interface: Override for the parsing adapter interface.
        temperature: Override for the LLM temperature.
        manual_traces: Override for manual traces data.
        replicate_count: Override for the number of replicates.
        abstention: Override for abstention detection flag.
        sufficiency: Override for sufficiency checking flag.
        embedding_check: Override for embedding check flag.
        deep_judgment: Override for deep judgment flag.
        evaluation_mode: Override for evaluation mode.
        embedding_threshold: Override for embedding similarity threshold.
        embedding_model: Override for embedding model name.
        async_execution: Override for async execution flag.
        async_workers: Override for number of async workers.
        use_full_trace_for_template: Override for full trace template flag.
        use_full_trace_for_rubric: Override for full trace rubric flag.
        deep_judgment_rubric_mode: Override for deep judgment rubric mode.
        deep_judgment_rubric_excerpts: Override for rubric excerpts flag.
        deep_judgment_rubric_max_excerpts: Override for max rubric excerpts.
        deep_judgment_rubric_fuzzy_threshold: Override for rubric fuzzy threshold.
        deep_judgment_rubric_retry_attempts: Override for rubric retry attempts.
        deep_judgment_rubric_search: Override for rubric search flag.
        deep_judgment_rubric_search_tool: Override for rubric search tool.
        deep_judgment_rubric_config: Override for rubric config dict.

    Returns:
        A new VerificationConfig with overrides applied.
    """
    # Start with base config dump or empty dict
    config_dict: dict[str, Any] = base.model_dump() if base else {}

    # --- Scalar overrides (None = don't override) ---

    # Replicate count
    if replicate_count is not None:
        config_dict["replicate_count"] = replicate_count
    elif not base:
        config_dict["replicate_count"] = 1

    # Feature flags
    if abstention is not None:
        config_dict["abstention_enabled"] = abstention
    if sufficiency is not None:
        config_dict["sufficiency_enabled"] = sufficiency
    if embedding_check is not None:
        config_dict["embedding_check_enabled"] = embedding_check
    if deep_judgment is not None:
        config_dict["deep_judgment_enabled"] = deep_judgment

    # Evaluation settings
    if evaluation_mode is not None:
        config_dict["evaluation_mode"] = evaluation_mode
    if embedding_threshold is not None:
        config_dict["embedding_check_threshold"] = embedding_threshold
    if embedding_model is not None:
        config_dict["embedding_check_model"] = embedding_model
    if async_execution is not None:
        config_dict["async_enabled"] = async_execution
    if async_workers is not None:
        config_dict["async_max_workers"] = async_workers

    # Trace filtering
    if use_full_trace_for_template is not None:
        config_dict["use_full_trace_for_template"] = use_full_trace_for_template
    if use_full_trace_for_rubric is not None:
        config_dict["use_full_trace_for_rubric"] = use_full_trace_for_rubric

    # Deep judgment rubric settings
    if deep_judgment_rubric_mode is not None:
        config_dict["deep_judgment_rubric_mode"] = deep_judgment_rubric_mode
    if deep_judgment_rubric_excerpts is not None:
        config_dict["deep_judgment_rubric_global_excerpts"] = deep_judgment_rubric_excerpts
    if deep_judgment_rubric_max_excerpts is not None:
        config_dict["deep_judgment_rubric_max_excerpts_default"] = deep_judgment_rubric_max_excerpts
    if deep_judgment_rubric_fuzzy_threshold is not None:
        config_dict["deep_judgment_rubric_fuzzy_match_threshold_default"] = deep_judgment_rubric_fuzzy_threshold
    if deep_judgment_rubric_retry_attempts is not None:
        config_dict["deep_judgment_rubric_excerpt_retry_attempts_default"] = deep_judgment_rubric_retry_attempts
    if deep_judgment_rubric_search is not None:
        config_dict["deep_judgment_rubric_search_enabled"] = deep_judgment_rubric_search
    if deep_judgment_rubric_search_tool is not None:
        config_dict["deep_judgment_rubric_search_tool"] = deep_judgment_rubric_search_tool
    if deep_judgment_rubric_config is not None:
        config_dict["deep_judgment_rubric_config"] = deep_judgment_rubric_config

    # --- Model configuration ---
    # Determine the unified interface (answering and parsing may differ)
    ans_interface = answering_interface
    par_interface = parsing_interface
    # If only a single 'interface' concept was provided via answering_interface,
    # it's already split by the caller. No implicit sharing here.

    answering_has_overrides = any(
        [
            answering_model is not None,
            answering_provider is not None,
            ans_interface is not None,
        ]
    )

    parsing_has_overrides = any(
        [
            parsing_model is not None,
            parsing_provider is not None,
            par_interface is not None,
        ]
    )

    if answering_has_overrides:
        config_dict["answering_models"] = [
            cls._build_model_config_dict(
                base_models=base.answering_models if base else None,
                model_name=answering_model,
                provider=answering_provider,
                model_id=answering_id,
                temperature=temperature,
                interface=ans_interface,
                manual_traces=manual_traces,
                default_model="gpt-4.1-mini",
                default_provider="openai",
                default_interface="langchain",
            )
        ]
    elif manual_traces is not None:
        # Manual interface requested via manual_traces without explicit model overrides
        config_dict["answering_models"] = [ModelConfig(interface="manual", manual_traces=manual_traces)]

    if parsing_has_overrides:
        config_dict["parsing_models"] = [
            cls._build_model_config_dict(
                base_models=base.parsing_models if base else None,
                model_name=parsing_model,
                provider=parsing_provider,
                model_id=parsing_id,
                temperature=temperature,
                interface=par_interface,
                manual_traces=None,  # Parsing model never uses manual interface
                default_model="gpt-4.1-mini",
                default_provider="openai",
                default_interface="langchain",
            )
        ]

    return cls(**config_dict)
from_preset classmethod
from_preset(filepath: Path) -> VerificationConfig

Load a VerificationConfig from a preset file. Delegates to config_presets.load_preset.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def from_preset(cls, filepath: Path) -> "VerificationConfig":
    """Load a VerificationConfig from a preset file. Delegates to config_presets.load_preset."""
    return load_preset(filepath)
get_few_shot_config
get_few_shot_config() -> FewShotConfig | None

Get the FewShotConfig for this verification run.

Returns:

Type Description
FewShotConfig | None

The FewShotConfig to use, or None if few-shot is disabled

Source code in src/karenina/schemas/verification/config.py
def get_few_shot_config(self) -> FewShotConfig | None:
    """
    Get the FewShotConfig for this verification run.

    Returns:
        The FewShotConfig to use, or None if few-shot is disabled
    """
    return self.few_shot_config
is_few_shot_enabled
is_few_shot_enabled() -> bool

Check if few-shot prompting is enabled.

Returns:

Type Description
bool

True if few-shot is enabled

Source code in src/karenina/schemas/verification/config.py
def is_few_shot_enabled(self) -> bool:
    """
    Check if few-shot prompting is enabled.

    Returns:
        True if few-shot is enabled
    """
    config = self.get_few_shot_config()
    return config is not None and config.enabled
sanitize_model_config classmethod
sanitize_model_config(
    model: dict[str, Any],
) -> dict[str, Any]

Sanitize model configuration. Delegates to config_presets.sanitize_model_config.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def sanitize_model_config(cls, model: dict[str, Any]) -> dict[str, Any]:
    """Sanitize model configuration. Delegates to config_presets.sanitize_model_config."""
    return sanitize_model_config(model)
sanitize_preset_name classmethod
sanitize_preset_name(name: str) -> str

Convert preset name to safe filename. Delegates to config_presets.sanitize_preset_name.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def sanitize_preset_name(cls, name: str) -> str:
    """Convert preset name to safe filename. Delegates to config_presets.sanitize_preset_name."""
    return sanitize_preset_name(name)
save_preset
save_preset(
    name: str,
    description: str | None = None,
    presets_dir: Path | None = None,
) -> dict[str, Any]

Save this config as a preset file. Delegates to config_presets.save_preset.

Source code in src/karenina/schemas/verification/config.py
def save_preset(
    self,
    name: str,
    description: str | None = None,
    presets_dir: Path | None = None,
) -> dict[str, Any]:
    """Save this config as a preset file. Delegates to config_presets.save_preset."""
    return save_preset(self, name, description, presets_dir)
validate_preset_metadata classmethod
validate_preset_metadata(
    name: str, description: str | None = None
) -> None

Validate preset metadata. Delegates to config_presets.validate_preset_metadata.

Source code in src/karenina/schemas/verification/config.py
@classmethod
def validate_preset_metadata(cls, name: str, description: str | None = None) -> None:
    """Validate preset metadata. Delegates to config_presets.validate_preset_metadata."""
    return validate_preset_metadata(name, description)

VerificationJob

Bases: BaseModel

Represents a verification job.

Source code in src/karenina/schemas/verification/job.py
class VerificationJob(BaseModel):
    """Represents a verification job."""

    model_config = ConfigDict(extra="forbid")

    job_id: str
    run_name: str  # User-defined or auto-generated run name
    status: Literal["pending", "running", "completed", "failed", "cancelled"]
    config: "VerificationConfig"

    # Database storage
    storage_url: str | None = None  # Database URL for auto-save functionality
    benchmark_name: str | None = None  # Benchmark name for auto-save functionality

    # Progress tracking
    total_questions: int
    processed_count: int = 0
    successful_count: int = 0
    failed_count: int = 0
    percentage: float = 0.0
    current_question: str = ""
    last_task_duration: float | None = None  # Execution time of last completed task

    # WebSocket streaming progress fields
    in_progress_questions: list[str] = Field(default_factory=list)

    # Task timing tracking (maps question_id to start time)
    task_start_times: dict[str, float] = Field(default_factory=dict)

    # Timing
    start_time: float | None = None
    end_time: float | None = None

    # Results
    result_set: "VerificationResultSet | None" = None  # Unified verification result container
    error_message: str | None = None

    @staticmethod
    def _make_task_key(question_id: str, replicate: int | None = None) -> str:
        """Create a unique task key from question_id and optional replicate.

        For single-replicate runs, returns just the question_id.
        For multi-replicate runs, appends _rep{N} to distinguish tasks.
        """
        if replicate is None:
            return question_id
        return f"{question_id}_rep{replicate}"

    def task_started(self, question_id: str, replicate: int | None = None) -> None:
        """Mark a task as started and record start time.

        Args:
            question_id: The question identifier
            replicate: Optional replicate number (for multi-replicate runs)
        """
        task_key = self._make_task_key(question_id, replicate)
        if task_key not in self.in_progress_questions:
            self.in_progress_questions.append(task_key)

        # Record task start time
        self.task_start_times[task_key] = time.time()

    def task_finished(self, question_id: str, success: bool, replicate: int | None = None) -> None:
        """Mark a task as finished, calculate duration, and update counts.

        Args:
            question_id: The question identifier
            success: Whether the task completed successfully
            replicate: Optional replicate number (for multi-replicate runs)
        """
        task_key = self._make_task_key(question_id, replicate)

        # Calculate task duration from recorded start time
        task_duration = 0.0
        if task_key in self.task_start_times:
            task_duration = time.time() - self.task_start_times[task_key]
            # Clean up start time
            del self.task_start_times[task_key]

        # Remove from in-progress list
        if task_key in self.in_progress_questions:
            self.in_progress_questions.remove(task_key)

        # Update counts
        self.processed_count += 1
        if success:
            self.successful_count += 1
        else:
            self.failed_count += 1

        # Update percentage
        self.percentage = (self.processed_count / self.total_questions) * 100 if self.total_questions > 0 else 0.0

        # Track last task duration
        self.last_task_duration = task_duration

    def to_dict(self) -> dict[str, Any]:
        """Convert job to dictionary for API response."""
        # Calculate duration if job has started
        duration = None
        if self.start_time:
            duration = self.end_time - self.start_time if self.end_time else time.time() - self.start_time

        return {
            "job_id": self.job_id,
            "run_name": self.run_name,
            "status": self.status,
            "total_questions": self.total_questions,
            "processed_count": self.processed_count,
            "successful_count": self.successful_count,
            "failed_count": self.failed_count,
            "percentage": self.percentage,
            "current_question": self.current_question,
            "duration_seconds": duration,
            "last_task_duration": self.last_task_duration,
            "error_message": self.error_message,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "in_progress_questions": self.in_progress_questions,
        }
Functions
task_finished
task_finished(
    question_id: str,
    success: bool,
    replicate: int | None = None,
) -> None

Mark a task as finished, calculate duration, and update counts.

Parameters:

Name Type Description Default
question_id str

The question identifier

required
success bool

Whether the task completed successfully

required
replicate int | None

Optional replicate number (for multi-replicate runs)

None
Source code in src/karenina/schemas/verification/job.py
def task_finished(self, question_id: str, success: bool, replicate: int | None = None) -> None:
    """Mark a task as finished, calculate duration, and update counts.

    Args:
        question_id: The question identifier
        success: Whether the task completed successfully
        replicate: Optional replicate number (for multi-replicate runs)
    """
    task_key = self._make_task_key(question_id, replicate)

    # Calculate task duration from recorded start time
    task_duration = 0.0
    if task_key in self.task_start_times:
        task_duration = time.time() - self.task_start_times[task_key]
        # Clean up start time
        del self.task_start_times[task_key]

    # Remove from in-progress list
    if task_key in self.in_progress_questions:
        self.in_progress_questions.remove(task_key)

    # Update counts
    self.processed_count += 1
    if success:
        self.successful_count += 1
    else:
        self.failed_count += 1

    # Update percentage
    self.percentage = (self.processed_count / self.total_questions) * 100 if self.total_questions > 0 else 0.0

    # Track last task duration
    self.last_task_duration = task_duration
task_started
task_started(
    question_id: str, replicate: int | None = None
) -> None

Mark a task as started and record start time.

Parameters:

Name Type Description Default
question_id str

The question identifier

required
replicate int | None

Optional replicate number (for multi-replicate runs)

None
Source code in src/karenina/schemas/verification/job.py
def task_started(self, question_id: str, replicate: int | None = None) -> None:
    """Mark a task as started and record start time.

    Args:
        question_id: The question identifier
        replicate: Optional replicate number (for multi-replicate runs)
    """
    task_key = self._make_task_key(question_id, replicate)
    if task_key not in self.in_progress_questions:
        self.in_progress_questions.append(task_key)

    # Record task start time
    self.task_start_times[task_key] = time.time()
to_dict
to_dict() -> dict[str, Any]

Convert job to dictionary for API response.

Source code in src/karenina/schemas/verification/job.py
def to_dict(self) -> dict[str, Any]:
    """Convert job to dictionary for API response."""
    # Calculate duration if job has started
    duration = None
    if self.start_time:
        duration = self.end_time - self.start_time if self.end_time else time.time() - self.start_time

    return {
        "job_id": self.job_id,
        "run_name": self.run_name,
        "status": self.status,
        "total_questions": self.total_questions,
        "processed_count": self.processed_count,
        "successful_count": self.successful_count,
        "failed_count": self.failed_count,
        "percentage": self.percentage,
        "current_question": self.current_question,
        "duration_seconds": duration,
        "last_task_duration": self.last_task_duration,
        "error_message": self.error_message,
        "start_time": self.start_time,
        "end_time": self.end_time,
        "in_progress_questions": self.in_progress_questions,
    }

VerificationResult

Bases: BaseModel

Result of verifying a single question.

Source code in src/karenina/schemas/verification/result.py
class VerificationResult(BaseModel):
    """Result of verifying a single question."""

    metadata: VerificationResultMetadata
    template: VerificationResultTemplate | None = None
    rubric: VerificationResultRubric | None = None
    deep_judgment: VerificationResultDeepJudgment | None = None
    deep_judgment_rubric: VerificationResultDeepJudgmentRubric | None = None

    # Shared trace filtering fields (for MCP agent responses)
    # These are at the root level because both template and rubric evaluation use the same input
    evaluation_input: str | None = None  # Input passed to evaluation (full trace or final AI message)
    used_full_trace: bool = True  # Whether full trace was used (True) or only final AI message (False)
    trace_extraction_error: str | None = None  # Error if final AI message extraction failed

Functions

export_verification_results_csv

export_verification_results_csv(
    job: VerificationJob,
    results: VerificationResultSet,
    global_rubric: HasTraitNames | None = None,
) -> str

Export verification results to CSV format with rubric consolidation.

Parameters:

Name Type Description Default
job
VerificationJob

The verification job

required
results
VerificationResultSet

VerificationResultSet containing all verification results

required
global_rubric
HasTraitNames | None

Optional global rubric object that implements HasTraitNames protocol for distinguishing global vs question-specific traits. If None, all rubric traits will be consolidated into question_specific_rubrics.

None

Returns:

Type Description
str

CSV string with results. Global rubric traits appear as dedicated columns

str

(rubric_TraitName), while question-specific traits are consolidated into

str

a single JSON column (question_specific_rubrics).

Note

The function gracefully handles errors in trait name extraction and JSON serialization, logging warnings and continuing with fallback values.

Source code in src/karenina/benchmark/verification/stages/helpers/results_exporter.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def export_verification_results_csv(
    job: VerificationJob, results: VerificationResultSet, global_rubric: HasTraitNames | None = None
) -> str:
    """
    Export verification results to CSV format with rubric consolidation.

    Args:
        job: The verification job
        results: VerificationResultSet containing all verification results
        global_rubric: Optional global rubric object that implements HasTraitNames protocol
                      for distinguishing global vs question-specific traits. If None,
                      all rubric traits will be consolidated into question_specific_rubrics.

    Returns:
        CSV string with results. Global rubric traits appear as dedicated columns
        (rubric_TraitName), while question-specific traits are consolidated into
        a single JSON column (question_specific_rubrics).

    Note:
        The function gracefully handles errors in trait name extraction and JSON
        serialization, logging warnings and continuing with fallback values.
    """
    # Input validation
    if not results or len(results) == 0:
        logger.warning("No results provided for CSV export. Generating empty CSV.")
        # Return minimal CSV with headers only
        output = StringIO()
        csv_writer = csv.writer(output)
        csv_writer.writerow(
            [
                "question_id",
                "success",
                "error",
                "question_text",
                "raw_llm_response",
                "keywords",
                "export_timestamp",
                "karenina_version",
                "job_id",
            ]
        )
        return output.getvalue()

    # Log export summary
    logger.info("Starting CSV export for %d results", len(results))

    output = StringIO()

    # Collect all unique rubric trait names across all results with validation
    all_rubric_traits: set[str] = set()
    invalid_trait_count = 0
    for result in results:
        if result.rubric:
            # Collect from all trait score dicts (llm, regex, callable, metric)
            for trait_dict in [
                result.rubric.llm_trait_scores,
                result.rubric.regex_trait_scores,
                result.rubric.callable_trait_scores,
                result.rubric.metric_trait_scores,
            ]:
                if trait_dict:
                    for trait_name in trait_dict:
                        if _validate_trait_name(trait_name):
                            all_rubric_traits.add(trait_name)
                        else:
                            invalid_trait_count += 1
                            logger.warning(
                                "Skipping invalid trait name '%s' in question %s",
                                trait_name,
                                result.metadata.question_id,
                            )

    if invalid_trait_count > 0:
        logger.info("Skipped %d invalid trait names during CSV export", invalid_trait_count)

    # Determine global vs question-specific rubrics
    global_trait_names: set[str] = set()
    if global_rubric:
        try:
            if hasattr(global_rubric, "get_trait_names") and callable(global_rubric.get_trait_names):
                trait_names = global_rubric.get_trait_names()
                if isinstance(trait_names, list):
                    # Validate each trait name from global rubric
                    valid_global_traits = []
                    for trait_name in trait_names:
                        if _validate_trait_name(trait_name):
                            valid_global_traits.append(trait_name)
                        else:
                            logger.warning("Skipping invalid global trait name '%s' from global_rubric", trait_name)
                    global_trait_names = set(valid_global_traits)

                    if len(valid_global_traits) != len(trait_names):
                        logger.info(
                            "Global rubric had %d traits, %d were valid for CSV export",
                            len(trait_names),
                            len(valid_global_traits),
                        )
                else:
                    logger.warning(
                        "Global rubric get_trait_names() returned %s instead of list. "
                        "All rubric traits will be treated as question-specific.",
                        type(trait_names).__name__,
                    )
            else:
                logger.warning(
                    "Global rubric object does not have a callable get_trait_names method. "
                    "All rubric traits will be treated as question-specific."
                )
        except (AttributeError, TypeError, ValueError) as e:
            logger.warning(
                "Error accessing global rubric trait names (%s: %s). "
                "All rubric traits will be treated as question-specific.",
                type(e).__name__,
                e,
            )
            # Continue with empty set - graceful degradation

    # Separate traits into global and question-specific (with performance optimization)
    global_traits = sorted(all_rubric_traits.intersection(global_trait_names))
    question_specific_traits = sorted(all_rubric_traits - global_trait_names)

    # Pre-compute set for faster lookups during row processing
    question_specific_traits_set = set(question_specific_traits)

    # Log export configuration
    logger.debug(
        "CSV export configuration: %d global traits, %d question-specific traits, %d total results",
        len(global_traits),
        len(question_specific_traits),
        len(results),
    )

    # Define CSV headers with all result fields + dynamic rubric columns
    headers = [
        "question_id",
        "success",
        "error",
        "question_text",
        "raw_llm_response",
        "parsed_gt_response",
        "parsed_llm_response",
        "template_verification_performed",
        "verify_result",
        "verify_granular_result",
        "rubric_evaluation_performed",
        "keywords",
    ]

    # Add global rubric trait columns (prefixed with 'rubric_')
    headers.extend([f"rubric_{trait}" for trait in global_traits])

    # Add single column for question-specific rubrics
    if question_specific_traits:
        headers.append("question_specific_rubrics")

    # Add remaining standard columns
    headers.extend(
        [
            "answering_model",
            "parsing_model",
            "replicate",
            "execution_time",
            "timestamp",
            "answering_system_prompt",
            "parsing_system_prompt",
            "run_name",
            "export_timestamp",
            "karenina_version",
            "job_id",
            # Embedding check fields
            "embedding_check_performed",
            "embedding_similarity_score",
            "embedding_override_applied",
            "embedding_model_used",
            # MCP server fields
            "answering_mcp_servers",
            # Deep-judgment fields
            "deep_judgment_enabled",
            "deep_judgment_performed",
            "extracted_excerpts",
            "attribute_reasoning",
            "deep_judgment_stages_completed",
            "deep_judgment_model_calls",
            "deep_judgment_excerpt_retry_count",
            "attributes_without_excerpts",
            # Search-enhanced deep-judgment fields
            "deep_judgment_search_enabled",
            "hallucination_risk_assessment",
            # Deep-judgment rubric fields
            "deep_judgment_rubric_performed",
            "extracted_rubric_excerpts",
            "rubric_trait_reasoning",
            "deep_judgment_rubric_scores",
            "standard_rubric_scores",
            "trait_metadata",
            "traits_without_valid_excerpts",
            "rubric_hallucination_risk_assessment",
            "total_deep_judgment_model_calls",
            "total_traits_evaluated",
            "total_excerpt_retries",
            # Metric trait fields
            "metric_trait_confusion_lists",
            "metric_trait_metrics",
            # LLM usage tracking fields
            "usage_metadata",
            "agent_metrics",
        ]
    )

    writer: csv.DictWriter[str] = csv.DictWriter(output, fieldnames=headers)
    writer.writeheader()

    # Metadata for each row
    export_timestamp = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime())
    karenina_version = get_karenina_version()

    # Write data rows
    for result in results:
        # Access fields from nested structure
        metadata = result.metadata
        template = result.template
        rubric = result.rubric
        deep_judgment = result.deep_judgment
        deep_judgment_rubric = result.deep_judgment_rubric

        row = {
            # Metadata fields
            "question_id": metadata.question_id,
            "success": metadata.completed_without_errors,  # Header uses 'success', not 'completed_without_errors'
            "error": metadata.error or "",
            "question_text": metadata.question_text,
            "keywords": _safe_json_serialize(metadata.keywords, metadata.question_id, "keywords"),
            "answering_model": metadata.answering_model,
            "parsing_model": metadata.parsing_model,
            "replicate": metadata.replicate or "",
            "execution_time": metadata.execution_time,
            "timestamp": metadata.timestamp,
            "run_name": metadata.run_name or "",
            # Template fields
            "raw_llm_response": template.raw_llm_response if template else "",
            "parsed_gt_response": _safe_json_serialize(
                template.parsed_gt_response if template else None, metadata.question_id, "parsed_gt_response"
            ),
            "parsed_llm_response": _safe_json_serialize(
                template.parsed_llm_response if template else None, metadata.question_id, "parsed_llm_response"
            ),
            "template_verification_performed": template.template_verification_performed if template else False,
            "verify_result": _serialize_verification_result(
                template.verify_result if template else None, metadata.question_id, "verify_result"
            ),
            "verify_granular_result": _serialize_verification_result(
                template.verify_granular_result if template else None, metadata.question_id, "verify_granular_result"
            ),
            "answering_system_prompt": metadata.answering_system_prompt or "",
            "parsing_system_prompt": metadata.parsing_system_prompt or "",
            "embedding_check_performed": template.embedding_check_performed if template else False,
            "embedding_similarity_score": template.embedding_similarity_score or "" if template else "",
            "embedding_override_applied": template.embedding_override_applied if template else False,
            "embedding_model_used": template.embedding_model_used or "" if template else "",
            "answering_mcp_servers": _safe_json_serialize(
                template.answering_mcp_servers if template else None, metadata.question_id, "answering_mcp_servers"
            ),
            "usage_metadata": _safe_json_serialize(
                template.usage_metadata if template else None, metadata.question_id, "usage_metadata"
            )
            if template and template.usage_metadata
            else "",
            "agent_metrics": _safe_json_serialize(
                template.agent_metrics if template else None, metadata.question_id, "agent_metrics"
            )
            if template and template.agent_metrics
            else "",
            # Rubric fields
            "rubric_evaluation_performed": rubric.rubric_evaluation_performed if rubric else False,
            "metric_trait_confusion_lists": _safe_json_serialize(
                rubric.metric_trait_confusion_lists if rubric else None,
                metadata.question_id,
                "metric_trait_confusion_lists",
            ),
            "metric_trait_metrics": _safe_json_serialize(
                rubric.metric_trait_scores if rubric else None, metadata.question_id, "metric_trait_metrics"
            ),
            # Deep-judgment fields
            "deep_judgment_enabled": deep_judgment.deep_judgment_enabled if deep_judgment else False,
            "deep_judgment_performed": deep_judgment.deep_judgment_performed if deep_judgment else False,
            "extracted_excerpts": _safe_json_serialize(
                deep_judgment.extracted_excerpts if deep_judgment else None, metadata.question_id, "extracted_excerpts"
            ),
            "attribute_reasoning": _safe_json_serialize(
                deep_judgment.attribute_reasoning if deep_judgment else None,
                metadata.question_id,
                "attribute_reasoning",
            ),
            "deep_judgment_stages_completed": _safe_json_serialize(
                deep_judgment.deep_judgment_stages_completed if deep_judgment else None,
                metadata.question_id,
                "deep_judgment_stages_completed",
            ),
            "deep_judgment_model_calls": deep_judgment.deep_judgment_model_calls if deep_judgment else 0,
            "deep_judgment_excerpt_retry_count": deep_judgment.deep_judgment_excerpt_retry_count
            if deep_judgment
            else 0,
            "attributes_without_excerpts": _safe_json_serialize(
                deep_judgment.attributes_without_excerpts if deep_judgment else None,
                metadata.question_id,
                "attributes_without_excerpts",
            ),
            "deep_judgment_search_enabled": deep_judgment.deep_judgment_search_enabled if deep_judgment else False,
            "hallucination_risk_assessment": _safe_json_serialize(
                deep_judgment.hallucination_risk_assessment if deep_judgment else None,
                metadata.question_id,
                "hallucination_risk_assessment",
            ),
            # Deep-judgment rubric fields
            "deep_judgment_rubric_performed": deep_judgment_rubric.deep_judgment_rubric_performed
            if deep_judgment_rubric
            else False,
            "extracted_rubric_excerpts": _safe_json_serialize(
                deep_judgment_rubric.extracted_rubric_excerpts if deep_judgment_rubric else None,
                metadata.question_id,
                "extracted_rubric_excerpts",
            ),
            "rubric_trait_reasoning": _safe_json_serialize(
                deep_judgment_rubric.rubric_trait_reasoning if deep_judgment_rubric else None,
                metadata.question_id,
                "rubric_trait_reasoning",
            ),
            "deep_judgment_rubric_scores": _safe_json_serialize(
                deep_judgment_rubric.deep_judgment_rubric_scores if deep_judgment_rubric else None,
                metadata.question_id,
                "deep_judgment_rubric_scores",
            ),
            "standard_rubric_scores": _safe_json_serialize(
                deep_judgment_rubric.standard_rubric_scores if deep_judgment_rubric else None,
                metadata.question_id,
                "standard_rubric_scores",
            ),
            "trait_metadata": _safe_json_serialize(
                deep_judgment_rubric.trait_metadata if deep_judgment_rubric else None,
                metadata.question_id,
                "trait_metadata",
            ),
            "traits_without_valid_excerpts": _safe_json_serialize(
                deep_judgment_rubric.traits_without_valid_excerpts if deep_judgment_rubric else None,
                metadata.question_id,
                "traits_without_valid_excerpts",
            ),
            "rubric_hallucination_risk_assessment": _safe_json_serialize(
                deep_judgment_rubric.rubric_hallucination_risk_assessment if deep_judgment_rubric else None,
                metadata.question_id,
                "rubric_hallucination_risk_assessment",
            ),
            "total_deep_judgment_model_calls": deep_judgment_rubric.total_deep_judgment_model_calls
            if deep_judgment_rubric
            else 0,
            "total_traits_evaluated": deep_judgment_rubric.total_traits_evaluated if deep_judgment_rubric else 0,
            "total_excerpt_retries": deep_judgment_rubric.total_excerpt_retries if deep_judgment_rubric else 0,
            # Export metadata
            "export_timestamp": export_timestamp,
            "karenina_version": karenina_version,
            "job_id": job.job_id,
        }

        # Add global rubric trait values from all trait score dicts
        if rubric:
            # Merge all trait scores into a unified dict for CSV export
            merged_traits: dict[str, Any] = {}
            if rubric.llm_trait_scores:
                merged_traits.update(rubric.llm_trait_scores)
            if rubric.regex_trait_scores:
                merged_traits.update(rubric.regex_trait_scores)
            if rubric.callable_trait_scores:
                merged_traits.update(rubric.callable_trait_scores)
            if rubric.metric_trait_scores:
                merged_traits.update(rubric.metric_trait_scores)

            # Use pre-computed set for faster membership testing
            for trait in global_traits:
                row[f"rubric_{trait}"] = str(merged_traits.get(trait, ""))
        else:
            # Set all global traits to empty when no rubric data
            for trait in global_traits:
                row[f"rubric_{trait}"] = ""

        # Add question-specific rubrics as JSON (optimized)
        if question_specific_traits_set:
            if rubric and merged_traits:
                # Use dictionary comprehension for better performance
                question_specific_rubrics = {
                    trait: merged_traits[trait] for trait in question_specific_traits_set if trait in merged_traits
                }
            else:
                question_specific_rubrics = {}

            # Safe JSON serialization with error handling
            serialized = _safe_json_serialize(
                question_specific_rubrics, metadata.question_id, "question_specific_rubrics"
            )
            row["question_specific_rubrics"] = serialized if serialized else "{}"

        writer.writerow(row)

    # Log completion summary
    result_count = len(results)
    logger.info("CSV export completed successfully for %d results", result_count)

    return output.getvalue()

export_verification_results_json

export_verification_results_json(
    job: VerificationJob,
    results: VerificationResultSet,
    global_rubric: HasTraitNames | None = None,
) -> str

Export verification results to JSON format with metadata (v2.0 format).

The v2.0 format optimizations: - Stores rubric definition once in shared_data (not per-result) - Stores trace filtering fields (evaluation_input, used_full_trace, trace_extraction_error) at result root level (shared by template and rubric evaluation) - 50-70% size reduction compared to legacy format

Parameters:

Name Type Description Default
job
VerificationJob

The verification job

required
results
VerificationResultSet

VerificationResultSet containing all verification results

required
global_rubric
HasTraitNames | None

Optional global rubric to include in shared_data for rubric definition

None

Returns:

Type Description
str

JSON string with results and metadata in v2.0 format

Source code in src/karenina/benchmark/verification/stages/helpers/results_exporter.py
def export_verification_results_json(
    job: VerificationJob, results: VerificationResultSet, global_rubric: HasTraitNames | None = None
) -> str:
    """
    Export verification results to JSON format with metadata (v2.0 format).

    The v2.0 format optimizations:
    - Stores rubric definition once in shared_data (not per-result)
    - Stores trace filtering fields (evaluation_input, used_full_trace, trace_extraction_error)
      at result root level (shared by template and rubric evaluation)
    - 50-70% size reduction compared to legacy format

    Args:
        job: The verification job
        results: VerificationResultSet containing all verification results
        global_rubric: Optional global rubric to include in shared_data for rubric definition

    Returns:
        JSON string with results and metadata in v2.0 format
    """
    # Build rubric definition from global_rubric if provided
    # This is stored once in shared_data instead of per-result
    # Use exclude_unset=True to match frontend export format (only include explicitly set fields)
    rubric_definition = None
    if global_rubric is not None:
        # Use model_dump for Pydantic models, otherwise try to extract trait lists
        if hasattr(global_rubric, "model_dump"):
            rubric_definition = global_rubric.model_dump(mode="json", exclude_unset=True)
        else:
            # Fallback for non-Pydantic objects that implement HasTraitNames
            rubric_definition = {"trait_names": global_rubric.get_trait_names()}

    export_data: dict[str, Any] = {
        "format_version": "2.1",
        "metadata": {
            "export_timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()),
            "karenina_version": get_karenina_version(),
            "job_id": job.job_id,
            "verification_config": {
                "answering_model": {
                    "provider": job.config.answering_models[0].model_provider if job.config.answering_models else None,
                    "name": job.config.answering_models[0].model_name if job.config.answering_models else None,
                    "temperature": job.config.answering_models[0].temperature if job.config.answering_models else None,
                    "interface": job.config.answering_models[0].interface if job.config.answering_models else None,
                },
                "parsing_model": {
                    "provider": job.config.parsing_models[0].model_provider if job.config.parsing_models else None,
                    "name": job.config.parsing_models[0].model_name if job.config.parsing_models else None,
                    "temperature": job.config.parsing_models[0].temperature if job.config.parsing_models else None,
                    "interface": job.config.parsing_models[0].interface if job.config.parsing_models else None,
                },
            },
            "job_summary": {
                "total_questions": job.total_questions,
                "successful_count": job.successful_count,
                "failed_count": job.failed_count,
                "start_time": job.start_time,
                "end_time": job.end_time,
                "total_duration": job.end_time - job.start_time if job.end_time and job.start_time else None,
            },
        },
        "shared_data": {
            "rubric_definition": rubric_definition,
        },
        "results": [],
    }

    # Convert results to serializable format with nested structure
    for result in results:
        # Use Pydantic's native JSON serialization - NO custom stringification
        # This preserves complex types (dicts, lists, booleans) for JSON export
        result_dict = result.model_dump(mode="json")
        export_data["results"].append(result_dict)

    return json.dumps(export_data, indent=2, ensure_ascii=False)

run_question_verification

run_question_verification(
    question_id: str,
    question_text: str,
    template_code: str,
    answering_model: ModelConfig,
    parsing_model: ModelConfig,
    run_name: str | None = None,
    replicate: int | None = None,
    rubric: Rubric | None = None,
    dynamic_rubric: DynamicRubric | None = None,
    keywords: list[str] | None = None,
    raw_answer: str | None = None,
    few_shot_examples: list[dict[str, str]] | None = None,
    few_shot_enabled: bool = False,
    abstention_enabled: bool = False,
    sufficiency_enabled: bool = False,
    deep_judgment_enabled: bool = False,
    rubric_evaluation_strategy: str = "batch",
    deep_judgment_max_excerpts_per_attribute: int = DEFAULT_DEEP_JUDGMENT_MAX_EXCERPTS,
    deep_judgment_fuzzy_match_threshold: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD,
    deep_judgment_excerpt_retry_attempts: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS,
    deep_judgment_search_enabled: bool = False,
    deep_judgment_search_tool: str | Any = "tavily",
    deep_judgment_rubric_mode: str = "disabled",
    deep_judgment_rubric_global_excerpts: bool = True,
    deep_judgment_rubric_config: dict[str, Any]
    | None = None,
    deep_judgment_rubric_max_excerpts_default: int = DEFAULT_RUBRIC_MAX_EXCERPTS,
    deep_judgment_rubric_fuzzy_match_threshold_default: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD,
    deep_judgment_rubric_excerpt_retry_attempts_default: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS,
    deep_judgment_rubric_search_enabled: bool = False,
    deep_judgment_rubric_search_tool: str | Any = "tavily",
    evaluation_mode: str = "template_only",
    cached_answer_data: dict[str, Any] | None = None,
    prompt_config: PromptConfig | None = None,
    use_full_trace_for_template: bool = False,
    use_full_trace_for_rubric: bool = True,
    agentic_parsing: bool = False,
    agentic_judge_context: str = "workspace_only",
    agentic_parsing_max_turns: int = 15,
    agentic_parsing_timeout: float = 120.0,
    workspace_root: Path | None = None,
    workspace_copy: bool = True,
    workspace_cleanup: bool = True,
    question_workspace_path: str | None = None,
    agentic_rubric_strategy: str = "individual",
    agentic_rubric_parallel: bool = False,
) -> VerificationResult

Run verification for a single question with specific answering and parsing models.

This function uses a stage-based pipeline architecture for modularity and testability. Each verification step (validation, generation, parsing, verification, etc.) is implemented as a discrete stage that can be independently tested and configured.

Parameters:

Name Type Description Default
question_id
str

Unique identifier for the question. For manual interface, this MUST be a 32-character hexadecimal MD5 hash (generated during question extraction).

required
question_text
str

The question to ask the LLM

required
template_code
str

Python code defining the Answer class

required
answering_model
ModelConfig

Configuration for the answering model

required
parsing_model
ModelConfig

Configuration for the parsing model

required
run_name
str | None

Optional run name for tracking

None
replicate
int | None

Optional replicate number for repeated runs of the same question

None
rubric
Rubric | None

Optional rubric for qualitative evaluation

None
keywords
list[str] | None

Optional keywords associated with the question

None
few_shot_examples
list[dict[str, str]] | None

Optional list of question-answer pairs for few-shot prompting

None
few_shot_enabled
bool

Whether to use few-shot prompting (disabled by default)

False
abstention_enabled
bool

Whether to enable abstention detection

False
sufficiency_enabled
bool

Whether to enable trace sufficiency detection

False
deep_judgment_enabled
bool

Whether to enable deep-judgment parsing

False
rubric_evaluation_strategy
str

Strategy for evaluating LLM rubric traits: - "batch": All traits evaluated in single LLM call (default, efficient) - "sequential": Traits evaluated one-by-one (reliable, more expensive)

'batch'
deep_judgment_max_excerpts_per_attribute
int

Max excerpts per attribute (deep-judgment)

DEFAULT_DEEP_JUDGMENT_MAX_EXCERPTS
deep_judgment_fuzzy_match_threshold
float

Similarity threshold for excerpts (deep-judgment)

DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD
deep_judgment_excerpt_retry_attempts
int

Retry attempts for excerpt validation (deep-judgment)

DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS
deep_judgment_search_enabled
bool

Whether to enable search enhancement (deep-judgment)

False
deep_judgment_search_tool
str | Any

Search tool name or callable (deep-judgment)

'tavily'
evaluation_mode
str

Evaluation mode determining which stages run: - "template_only": Template verification only (default) - "template_and_rubric": Template verification + rubric evaluation - "rubric_only": Skip template, only evaluate rubrics on raw response

'template_only'
cached_answer_data
dict[str, Any] | None

Optional cached answer data from previous generation. If provided, the GenerateAnswerStage will skip LLM invocation and use this cached data. Used to share answers across multiple judges.

None

Returns:

Type Description
VerificationResult

VerificationResult with all details and optional rubric scores

Raises:

Type Description
ValueError

If question_id is not a valid MD5 hash when using manual interface

RuntimeError

If stage orchestration fails critically

Source code in src/karenina/benchmark/verification/runner.py
def run_single_model_verification(
    question_id: str,
    question_text: str,
    template_code: str,
    answering_model: ModelConfig,
    parsing_model: ModelConfig,
    run_name: str | None = None,
    replicate: int | None = None,
    rubric: Rubric | None = None,
    dynamic_rubric: DynamicRubric | None = None,
    keywords: list[str] | None = None,
    raw_answer: str | None = None,
    few_shot_examples: list[dict[str, str]] | None = None,
    few_shot_enabled: bool = False,
    abstention_enabled: bool = False,
    sufficiency_enabled: bool = False,
    deep_judgment_enabled: bool = False,
    rubric_evaluation_strategy: str = "batch",
    deep_judgment_max_excerpts_per_attribute: int = DEFAULT_DEEP_JUDGMENT_MAX_EXCERPTS,
    deep_judgment_fuzzy_match_threshold: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD,
    deep_judgment_excerpt_retry_attempts: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS,
    deep_judgment_search_enabled: bool = False,
    deep_judgment_search_tool: str | Any = "tavily",
    # Deep-judgment rubric configuration (NEW)
    deep_judgment_rubric_mode: str = "disabled",
    deep_judgment_rubric_global_excerpts: bool = True,
    deep_judgment_rubric_config: dict[str, Any] | None = None,
    deep_judgment_rubric_max_excerpts_default: int = DEFAULT_RUBRIC_MAX_EXCERPTS,
    deep_judgment_rubric_fuzzy_match_threshold_default: float = DEFAULT_DEEP_JUDGMENT_FUZZY_THRESHOLD,
    deep_judgment_rubric_excerpt_retry_attempts_default: int = DEFAULT_DEEP_JUDGMENT_RETRY_ATTEMPTS,
    deep_judgment_rubric_search_enabled: bool = False,
    deep_judgment_rubric_search_tool: str | Any = "tavily",
    evaluation_mode: str = "template_only",
    cached_answer_data: dict[str, Any] | None = None,
    # Prompt configuration
    prompt_config: PromptConfig | None = None,
    # Trace filtering configuration (MCP Agent Evaluation)
    use_full_trace_for_template: bool = False,
    use_full_trace_for_rubric: bool = True,
    # Agentic parsing configuration
    agentic_parsing: bool = False,
    agentic_judge_context: str = "workspace_only",
    agentic_parsing_max_turns: int = 15,
    agentic_parsing_timeout: float = 120.0,
    workspace_root: Path | None = None,
    workspace_copy: bool = True,
    workspace_cleanup: bool = True,
    question_workspace_path: str | None = None,
    # Agentic rubric evaluation configuration
    agentic_rubric_strategy: str = "individual",
    agentic_rubric_parallel: bool = False,
) -> VerificationResult:
    """
    Run verification for a single question with specific answering and parsing models.

    This function uses a stage-based pipeline architecture for modularity and testability.
    Each verification step (validation, generation, parsing, verification, etc.) is
    implemented as a discrete stage that can be independently tested and configured.

    Args:
        question_id: Unique identifier for the question. For manual interface, this MUST be
                    a 32-character hexadecimal MD5 hash (generated during question extraction).
        question_text: The question to ask the LLM
        template_code: Python code defining the Answer class
        answering_model: Configuration for the answering model
        parsing_model: Configuration for the parsing model
        run_name: Optional run name for tracking
        replicate: Optional replicate number for repeated runs of the same question
        rubric: Optional rubric for qualitative evaluation
        keywords: Optional keywords associated with the question
        few_shot_examples: Optional list of question-answer pairs for few-shot prompting
        few_shot_enabled: Whether to use few-shot prompting (disabled by default)
        abstention_enabled: Whether to enable abstention detection
        sufficiency_enabled: Whether to enable trace sufficiency detection
        deep_judgment_enabled: Whether to enable deep-judgment parsing
        rubric_evaluation_strategy: Strategy for evaluating LLM rubric traits:
            - "batch": All traits evaluated in single LLM call (default, efficient)
            - "sequential": Traits evaluated one-by-one (reliable, more expensive)
        deep_judgment_max_excerpts_per_attribute: Max excerpts per attribute (deep-judgment)
        deep_judgment_fuzzy_match_threshold: Similarity threshold for excerpts (deep-judgment)
        deep_judgment_excerpt_retry_attempts: Retry attempts for excerpt validation (deep-judgment)
        deep_judgment_search_enabled: Whether to enable search enhancement (deep-judgment)
        deep_judgment_search_tool: Search tool name or callable (deep-judgment)
        evaluation_mode: Evaluation mode determining which stages run:
            - "template_only": Template verification only (default)
            - "template_and_rubric": Template verification + rubric evaluation
            - "rubric_only": Skip template, only evaluate rubrics on raw response
        cached_answer_data: Optional cached answer data from previous generation.
            If provided, the GenerateAnswerStage will skip LLM invocation and use
            this cached data. Used to share answers across multiple judges.

    Returns:
        VerificationResult with all details and optional rubric scores

    Raises:
        ValueError: If question_id is not a valid MD5 hash when using manual interface
        RuntimeError: If stage orchestration fails critically
    """
    # Compute template_id from template_code (composite key component)
    template_id = generate_template_id(template_code)

    # Initialize verification context with all parameters
    context = VerificationContext(
        # Identity & Metadata
        question_id=question_id,
        template_id=template_id,
        question_text=question_text,
        template_code=template_code,
        # Configuration
        answering_model=answering_model,
        parsing_model=parsing_model,
        rubric=rubric,
        dynamic_rubric=dynamic_rubric,
        keywords=keywords,
        raw_answer=raw_answer,
        # Run Metadata
        run_name=run_name,
        replicate=replicate,
        # Feature Flags
        few_shot_enabled=few_shot_enabled,
        abstention_enabled=abstention_enabled,
        sufficiency_enabled=sufficiency_enabled,
        deep_judgment_enabled=deep_judgment_enabled,
        # Rubric Configuration
        rubric_evaluation_strategy=rubric_evaluation_strategy,
        # Deep-Judgment Configuration
        deep_judgment_max_excerpts_per_attribute=deep_judgment_max_excerpts_per_attribute,
        deep_judgment_fuzzy_match_threshold=deep_judgment_fuzzy_match_threshold,
        deep_judgment_excerpt_retry_attempts=deep_judgment_excerpt_retry_attempts,
        deep_judgment_search_enabled=deep_judgment_search_enabled,
        deep_judgment_search_tool=deep_judgment_search_tool,
        # Deep-Judgment Rubric Configuration (NEW)
        deep_judgment_rubric_mode=deep_judgment_rubric_mode,
        deep_judgment_rubric_global_excerpts=deep_judgment_rubric_global_excerpts,
        deep_judgment_rubric_config=deep_judgment_rubric_config,
        deep_judgment_rubric_max_excerpts_default=deep_judgment_rubric_max_excerpts_default,
        deep_judgment_rubric_fuzzy_match_threshold_default=deep_judgment_rubric_fuzzy_match_threshold_default,
        deep_judgment_rubric_excerpt_retry_attempts_default=deep_judgment_rubric_excerpt_retry_attempts_default,
        deep_judgment_rubric_search_enabled=deep_judgment_rubric_search_enabled,
        deep_judgment_rubric_search_tool=deep_judgment_rubric_search_tool,
        # Few-Shot Configuration
        few_shot_examples=few_shot_examples,
        # Prompt Configuration
        prompt_config=prompt_config,
        # Trace Filtering Configuration (MCP Agent Evaluation)
        use_full_trace_for_template=use_full_trace_for_template,
        use_full_trace_for_rubric=use_full_trace_for_rubric,
        # Answer Caching
        cached_answer_data=cached_answer_data,
        # Agentic Parsing
        agentic_parsing=agentic_parsing,
        agentic_judge_context=agentic_judge_context,
        agentic_parsing_max_turns=agentic_parsing_max_turns,
        agentic_parsing_timeout=agentic_parsing_timeout,
        question_workspace_path=question_workspace_path,
        workspace_root=workspace_root,
        workspace_copy=workspace_copy,
        workspace_cleanup=workspace_cleanup,
        # Agentic Rubric
        agentic_rubric_strategy=agentic_rubric_strategy,
        agentic_rubric_parallel=agentic_rubric_parallel,
    )

    # Build ModelIdentity objects for pipeline use (needed even if validation fails)
    from karenina.schemas.verification.model_identity import ModelIdentity

    answering_identity = ModelIdentity.from_model_config(answering_model, role="answering")
    parsing_identity = ModelIdentity.from_model_config(parsing_model, role="parsing")

    # Store ModelIdentity objects in context for downstream stages (e.g., finalize_result)
    context.set_artifact("answering_model_identity", answering_identity)
    context.set_artifact("parsing_model_identity", parsing_identity)

    # Store MCP server names as result field for VerificationResultTemplate
    answering_mcp_servers = list(answering_model.mcp_urls_dict.keys()) if answering_model.mcp_urls_dict else None
    context.set_result_field("answering_mcp_servers", answering_mcp_servers)

    # Determine evaluation mode automatically if not explicitly set.
    # If rubric or dynamic_rubric is provided and mode is template_only,
    # upgrade to template_and_rubric.
    _has_rubric_traits = rubric and (
        rubric.llm_traits
        or rubric.regex_traits
        or rubric.callable_traits
        or rubric.metric_traits
        or rubric.agentic_traits
    )
    _has_dynamic_rubric_traits = dynamic_rubric is not None and not dynamic_rubric.is_empty()
    if (_has_rubric_traits or _has_dynamic_rubric_traits) and evaluation_mode == "template_only":
        evaluation_mode = "template_and_rubric"

    # Build stage orchestrator from configuration
    orchestrator = StageOrchestrator.from_config(
        rubric=rubric,
        dynamic_rubric=dynamic_rubric,
        abstention_enabled=abstention_enabled,
        sufficiency_enabled=sufficiency_enabled,
        deep_judgment_enabled=deep_judgment_enabled,
        evaluation_mode=evaluation_mode,
        agentic_parsing=agentic_parsing,
    )

    # Execute verification pipeline
    result = orchestrator.execute(context)

    return result

validate_answer_template

validate_answer_template(
    template_code: str,
) -> tuple[bool, str | None, type | None]

Validate that template code defines a proper Answer class.

Discovers the answer class by scanning for the leaf BaseAnswer subclass, supporting custom class names (not just "Answer").

Parameters:

Name Type Description Default
template_code
str

Python source code defining a BaseAnswer subclass.

required

Returns:

Type Description
tuple[bool, str | None, type | None]

Tuple of (is_valid, error_message, Answer_class).

Source code in src/karenina/benchmark/verification/utils/template_validation.py
def validate_answer_template(template_code: str) -> tuple[bool, str | None, type | None]:
    """Validate that template code defines a proper Answer class.

    Discovers the answer class by scanning for the leaf BaseAnswer subclass,
    supporting custom class names (not just "Answer").

    Args:
        template_code: Python source code defining a BaseAnswer subclass.

    Returns:
        Tuple of (is_valid, error_message, Answer_class).
    """
    try:
        global_ns = _build_exec_namespace()
        local_ns: dict[str, Any] = {}

        exec(template_code, global_ns, local_ns)

        # Discover the answer class (supports custom names)
        try:
            Answer = find_answer_class(local_ns)
        except ValueError as e:
            return False, str(e), None

        # Store the template code for exec-created classes
        # (since inspect.getsource() won't work for them)
        Answer._source_code = template_code  # type: ignore[attr-defined]

        if not inspect.isclass(Answer):
            return False, "Answer is not a class", None

        if not issubclass(Answer, BaseAnswer):
            return False, "Answer class must inherit from BaseAnswer", None

        # Check if it has a verify method (not required for regex-only or VerifiedField templates)
        from .template_parsing_helpers import is_regex_only_template

        has_verified_fields = bool(Answer._get_verified_fields())
        if not is_regex_only_template(Answer) and not has_verified_fields:
            if not hasattr(Answer, "verify"):
                return False, "does not have a 'verify' method", None
            if not callable(getattr(Answer, "verify", None)):
                return False, "verify must be a callable method", None

        # The 'correct' field is optional, but if present via ground_truth/model_post_init, it must be a dict
        has_init = "model_post_init" in Answer.__dict__ or "ground_truth" in Answer.__dict__
        if has_init:
            try:
                from .template_parsing_helpers import create_test_instance_from_answer_class

                test_instance, ground_truth = create_test_instance_from_answer_class(Answer)
                if ground_truth is not None and not isinstance(ground_truth, dict):
                    return False, "ground_truth/model_post_init must assign 'self.correct' as a dictionary", None
            except Exception as e:
                return False, f"Error testing ground_truth/model_post_init: {e}", None

        return True, None, Answer

    except SyntaxError as e:
        return False, f"Error executing template code: {e}", None
    except Exception as e:
        return False, f"Error executing template code: {e}", None