Lift drill into evals/ at 013fcb8b7dbefd6d3fa4653493e5d2ec8e7f985b

rsync of obra/drill@013fcb8b7d into superpowers/evals/, excluding
.git/, .venv/, results/, .env/, __pycache__/, *.egg-info/,
.private-journal/.

The drill repo is unaffected by this commit; archival is a separate
manual step after this PR merges.

Source SHA recorded at evals/.drill-source-sha for divergence
detection.
This commit is contained in:
Jesse Vincent
2026-05-06 12:15:46 -07:00
parent 895bb732d5
commit 3c046f579e
124 changed files with 13806 additions and 0 deletions

3
evals/drill/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""Drill: Superpowers skill compliance benchmark."""
__version__: str = "0.1.0"

5
evals/drill/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Allow running drill as `python3 -m drill`."""
from drill.cli import main
main()

81
evals/drill/actor.py Normal file
View File

@@ -0,0 +1,81 @@
"""Actor LLM: simulates a user driving an agent session."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import anthropic
from jinja2 import Template
ACTOR_TOOL: dict[str, Any] = {
"name": "terminal_action",
"description": "Send an action to the terminal session.",
"input_schema": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["type", "done", "stuck", "key"],
"description": "The action to take.",
},
"text": {
"type": "string",
"description": "Text to type (only for 'type' action).",
},
"key": {
"type": "string",
"description": "Special key to send (only for 'key' action, e.g., 'ctrl-c').",
},
},
"required": ["action"],
},
}
@dataclass
class ActorAction:
action: str
text: str | None = None
key: str | None = None
@classmethod
def from_tool_result(cls, data: dict[str, Any]) -> ActorAction:
return cls(action=data["action"], text=data.get("text"), key=data.get("key"))
class Actor:
def __init__(self, model: str = "claude-sonnet-4-6", temperature: float = 0.7) -> None:
self.model = model
self.temperature = temperature
self.captures: list[str] = []
self._system_prompt: str = ""
self._client: anthropic.Anthropic = anthropic.Anthropic()
def build_system_prompt(self, posture: str, intents: list[str]) -> str:
template_path = Path(__file__).parent.parent / "prompts" / "actor.md"
template = Template(template_path.read_text())
self._system_prompt = template.render(posture=posture, intents=intents)
return self._system_prompt
def append_capture(self, terminal_output: str) -> None:
self.captures.append(terminal_output)
def build_messages(self) -> list[dict[str, str]]:
return [{"role": "user", "content": capture} for capture in self.captures]
def decide(self) -> ActorAction:
response = self._client.messages.create(
model=self.model,
max_tokens=1024,
temperature=self.temperature,
system=self._system_prompt,
tools=[ACTOR_TOOL], # ty: ignore[invalid-argument-type]
tool_choice={"type": "tool", "name": "terminal_action"},
messages=self.build_messages(), # ty: ignore[invalid-argument-type]
)
for block in response.content:
if block.type == "tool_use":
return ActorAction.from_tool_result(block.input)
raise RuntimeError("Actor did not return a tool_use block")

89
evals/drill/assertions.py Normal file
View File

@@ -0,0 +1,89 @@
"""Post-session deterministic assertions for drill scenarios."""
from __future__ import annotations
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from drill.verifier import CriterionResult
@dataclass
class AssertionResult:
command: str
passed: bool
exit_code: int
stdout: str
stderr: str
def to_criterion_result(self) -> CriterionResult:
evidence = f"exit code {self.exit_code}"
if self.stdout:
evidence += f"\nstdout: {self.stdout}"
if self.stderr:
evidence += f"\nstderr: {self.stderr}"
return CriterionResult(
criterion=f"[assertion] {self.command}",
verdict="pass" if self.passed else "fail",
evidence=evidence,
rationale="Deterministic assertion " + ("passed" if self.passed else "failed"),
source="assertion",
)
def run_verify_assertions(
assertions: list[str],
results_dir: Path,
workdir: Path,
*,
timeout_seconds: int = 10,
) -> list[AssertionResult]:
bin_dir = Path(__file__).parent.parent / "bin"
env = {
**os.environ,
"DRILL_WORKDIR": str(workdir),
"PATH": f"{bin_dir}:{os.environ.get('PATH', '')}",
}
results: list[AssertionResult] = []
for cmd in assertions:
try:
proc = subprocess.run(
["bash", "-c", cmd],
cwd=results_dir,
capture_output=True,
text=True,
env=env,
timeout=timeout_seconds,
)
results.append(
AssertionResult(
command=cmd,
passed=proc.returncode == 0,
exit_code=proc.returncode,
stdout=proc.stdout.strip(),
stderr=proc.stderr.strip(),
)
)
except subprocess.TimeoutExpired:
results.append(
AssertionResult(
command=cmd,
passed=False,
exit_code=124,
stdout="",
stderr=f"Timed out after {timeout_seconds}s",
)
)
except Exception as e:
results.append(
AssertionResult(
command=cmd,
passed=False,
exit_code=-1,
stdout="",
stderr=str(e),
)
)
return results

111
evals/drill/backend.py Normal file
View File

@@ -0,0 +1,111 @@
"""Backend config loader and command builder."""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
@dataclass
class Backend:
name: str
cli: str
args: list[str]
required_env: list[str]
hooks: dict[str, list[str]]
shutdown: str
idle: dict[str, Any]
startup_timeout: int
terminal: dict[str, int]
session_logs: dict[str, str]
turn_timeout: int | None = None
busy_pattern: str = ""
max_busy_seconds: int = 1800
def build_command(self, workdir: str) -> list[str]:
resolved = [_interpolate_env(arg) for arg in self.args]
return [self.cli, *resolved]
def validate_env(self) -> None:
missing = [v for v in self.required_env if not os.environ.get(v)]
if missing:
raise OSError(
f"Missing required environment variables for {self.name} backend: "
+ ", ".join(missing)
)
def is_ready_line(self, line: str) -> bool:
pattern = self.idle.get("ready_pattern", "")
return bool(re.search(pattern, line))
def is_busy_line(self, line: str) -> bool:
if not self.busy_pattern:
return False
return bool(re.search(self.busy_pattern, line))
@property
def quiescence_seconds(self) -> float:
return self.idle.get("quiescence_seconds", 5)
@property
def cols(self) -> int:
return self.terminal.get("cols", 200)
@property
def rows(self) -> int:
return self.terminal.get("rows", 50)
@property
def model(self) -> str | None:
"""Model name from args (looks for --model or -m flag)."""
for i, arg in enumerate(self.args):
if arg in ("--model", "-m") and i + 1 < len(self.args):
return self.args[i + 1]
return None
@property
def family(self) -> str:
"""Normalize backend name to a family for log-dir / normalizer dispatch."""
for fam in ("claude", "codex", "gemini"):
if self.name == fam or self.name.startswith(f"{fam}-"):
return fam
return "other"
def load_backend(name: str, backends_dir: Path) -> Backend:
path = backends_dir / f"{name}.yaml"
if not path.exists():
raise FileNotFoundError(f"Backend config not found: {path}")
with open(path) as f:
data = yaml.safe_load(f)
return Backend(
name=data["name"],
cli=data["cli"],
args=data.get("args", []),
required_env=data.get("required_env", []),
hooks=data.get("hooks", {"pre_run": [], "post_run": []}),
shutdown=data.get("shutdown", "/exit"),
idle=data.get("idle", {}),
startup_timeout=data.get("startup_timeout", 30),
terminal=data.get("terminal", {"cols": 200, "rows": 50}),
session_logs=data.get("session_logs", {}),
turn_timeout=data.get("turn_timeout"),
busy_pattern=data.get("busy_pattern", ""),
max_busy_seconds=data.get("max_busy_seconds", 1800),
)
def _interpolate_env(value: str) -> str:
def replacer(match: re.Match[str]) -> str:
var = match.group(1)
val = os.environ.get(var)
if val is None:
raise OSError(f"Environment variable {var} not set")
return val
return re.sub(r"\$\{(\w+)\}", replacer, value)

137
evals/drill/cli.py Normal file
View File

@@ -0,0 +1,137 @@
"""Drill CLI: run, compare, list."""
from __future__ import annotations
import secrets
from pathlib import Path
import click
from dotenv import load_dotenv
PROJECT_ROOT: Path = Path(__file__).parent.parent
load_dotenv(PROJECT_ROOT / ".env")
@click.group()
def main() -> None:
"""Drill: Superpowers skill compliance benchmark."""
pass
@main.command()
@click.argument("scenario")
@click.option("--backend", "-b", default=None, help="Backend name (e.g., claude, codex)")
@click.option("--models", "-m", default=None, help="Comma-separated backend names for sweep")
@click.option("--n", "n_runs", type=int, default=1, help="Number of repetitions per backend")
@click.option(
"--backends-dir",
type=click.Path(exists=True, path_type=Path),
default=PROJECT_ROOT / "backends",
)
@click.option(
"--scenarios-dir",
type=click.Path(exists=True, path_type=Path),
default=PROJECT_ROOT / "scenarios",
)
@click.option(
"--fixtures-dir",
type=click.Path(exists=True, path_type=Path),
default=PROJECT_ROOT / "fixtures",
)
@click.option("--results-dir", type=click.Path(path_type=Path), default=PROJECT_ROOT / "results")
def run(
scenario: str,
backend: str | None,
models: str | None,
n_runs: int,
backends_dir: Path,
scenarios_dir: Path,
fixtures_dir: Path,
results_dir: Path,
) -> None:
"""Run a scenario against one or more backends."""
if n_runs < 1:
raise click.ClickException("--n must be at least 1")
if models:
backend_names = [b.strip() for b in models.split(",") if b.strip()]
elif backend:
backend_names = [backend]
else:
raise click.ClickException("Either --backend or --models is required")
scenario_path = scenarios_dir / f"{scenario}.yaml"
if not scenario_path.exists():
raise click.ClickException(f"Scenario not found: {scenario_path}")
sweep_id = secrets.token_hex(4)
from drill.sweep import Sweep
sweep = Sweep(
scenario_path=scenario_path,
backend_names=backend_names,
backends_dir=backends_dir,
fixtures_dir=fixtures_dir,
results_dir=results_dir,
n=n_runs,
sweep_id=sweep_id,
)
total = len(backend_names) * n_runs
click.echo(
f"Running {scenario} | backends: {', '.join(backend_names)} | "
f"n={n_runs} | total runs: {total} | sweep: {sweep_id}"
)
groups = sweep.run_all()
for group in groups:
passed = sum(1 for r in group.runs if r.status == "pass")
failed = sum(1 for r in group.runs if r.status == "fail")
errored = sum(1 for r in group.runs if r.status == "error")
click.echo(f"\n{group.backend}: {passed} passed, {failed} failed, {errored} errors")
if group.partial:
click.echo(" (interrupted — partial results)")
@main.command("list")
@click.option(
"--scenarios-dir",
type=click.Path(exists=True, path_type=Path),
default=PROJECT_ROOT / "scenarios",
)
def list_scenarios(scenarios_dir: Path) -> None:
"""List available scenarios."""
import yaml
for f in sorted(scenarios_dir.glob("*.yaml")):
with open(f) as fh:
data = yaml.safe_load(fh)
name = data.get("scenario", f.stem)
desc = data.get("description", "")
click.echo(f" {name:40s} {desc}")
@main.command()
@click.argument("scenario")
@click.option("--sweep", "sweep_id", default=None, help="Filter by sweep ID")
@click.option(
"--results-dir",
type=click.Path(exists=True, path_type=Path),
default=PROJECT_ROOT / "results",
)
def compare(scenario: str, sweep_id: str | None, results_dir: Path) -> None:
"""Compare results across backends for a scenario."""
from drill.compare import format_compare_output, load_scenario_results
scenario_dir = results_dir / scenario
if not scenario_dir.exists():
raise click.ClickException(f"No results found for: {scenario}")
results = load_scenario_results(scenario_dir, sweep_id=sweep_id)
if not results:
raise click.ClickException(f"No results found for: {scenario}")
click.echo(format_compare_output(scenario, results))

255
evals/drill/compare.py Normal file
View File

@@ -0,0 +1,255 @@
"""Compare: load and aggregate drill results across backends and runs."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from drill.stats import wilson_ci
from drill.verifier import Verdict
@dataclass
class BackendResult:
backend: str
total_runs: int
passed_runs: int
errored_runs: int
avg_turns: float
criterion_counts: dict[str, tuple[int, int]] # criterion -> (passed, total)
sweep_id: str | None
timestamp: str | None
partial: bool
@property
def pass_rate(self) -> float:
if self.total_runs == 0:
return 0.0
return self.passed_runs / self.total_runs
def load_scenario_results(
scenario_dir: Path,
*,
sweep_id: str | None = None,
) -> dict[str, BackendResult]:
results: dict[str, BackendResult] = {}
for backend_dir in sorted(scenario_dir.iterdir()):
if not backend_dir.is_dir():
continue
timestamp_dirs = sorted(backend_dir.iterdir())
if not timestamp_dirs:
continue
target_dir: Path | None = None
if sweep_id:
for d in timestamp_dirs:
rg_path = d / "run-group.json"
if rg_path.exists():
rg = json.loads(rg_path.read_text())
if rg.get("sweep_id") == sweep_id:
target_dir = d
break
else:
target_dir = timestamp_dirs[-1]
if target_dir is None:
continue
result = _load_backend_result(backend_dir.name, target_dir)
if result is not None:
results[backend_dir.name] = result
return results
def _load_backend_result(backend_name: str, timestamp_dir: Path) -> BackendResult | None:
rg_path = timestamp_dir / "run-group.json"
if rg_path.exists():
return _load_new_format(backend_name, timestamp_dir, rg_path)
elif (timestamp_dir / "verdict.json").exists():
return _load_old_format(backend_name, timestamp_dir)
return None
def _load_new_format(backend_name: str, timestamp_dir: Path, rg_path: Path) -> BackendResult:
rg: dict[str, Any] = json.loads(rg_path.read_text())
run_dirs = sorted(
d for d in timestamp_dir.iterdir() if d.is_dir() and d.name.startswith("run-")
)
verdicts: list[Verdict] = []
metas: list[dict[str, Any]] = []
for run_dir in run_dirs:
verdict_path = run_dir / "verdict.json"
meta_path = run_dir / "meta.json"
if verdict_path.exists():
verdicts.append(Verdict.model_validate_json(verdict_path.read_text()))
if meta_path.exists():
metas.append(json.loads(meta_path.read_text()))
passed_runs = sum(1 for v in verdicts if v.passed)
errored_runs = sum(1 for r in rg.get("runs", []) if r.get("status") == "error")
avg_turns = sum(m.get("actor_turns", 0) for m in metas) / len(metas) if metas else 0.0
criterion_counts: dict[str, tuple[int, int]] = {}
for v in verdicts:
for c in v.criteria:
prev_passed, prev_total = criterion_counts.get(c.criterion, (0, 0))
criterion_counts[c.criterion] = (
prev_passed + (1 if c.verdict == "pass" else 0),
prev_total + 1,
)
return BackendResult(
backend=backend_name,
total_runs=len(verdicts),
passed_runs=passed_runs,
errored_runs=errored_runs,
avg_turns=round(avg_turns, 1),
criterion_counts=criterion_counts,
sweep_id=rg.get("sweep_id"),
timestamp=rg.get("timestamp"),
partial=rg.get("partial", False),
)
def _load_old_format(backend_name: str, timestamp_dir: Path) -> BackendResult:
verdict = Verdict.model_validate_json((timestamp_dir / "verdict.json").read_text())
meta: dict[str, Any] = {}
meta_path = timestamp_dir / "meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
criterion_counts: dict[str, tuple[int, int]] = {}
for c in verdict.criteria:
criterion_counts[c.criterion] = (1 if c.verdict == "pass" else 0, 1)
return BackendResult(
backend=backend_name,
total_runs=1,
passed_runs=1 if verdict.passed else 0,
errored_runs=0,
avg_turns=float(meta.get("actor_turns", 0)),
criterion_counts=criterion_counts,
sweep_id=None,
timestamp=None,
partial=False,
)
def format_compare_output(
scenario: str,
results: dict[str, BackendResult],
) -> str:
if not results:
return f"No results found for: {scenario}"
lines: list[str] = []
is_multi_run = any(r.total_runs > 1 for r in results.values())
if is_multi_run:
first = next(iter(results.values()))
lines.append(f"Scenario: {scenario}")
if first.sweep_id:
sweep_label = f"Sweep: {first.sweep_id}"
if first.timestamp:
date_str = first.timestamp.split("T")[0]
sweep_label += f" | {date_str}"
lines.append(sweep_label)
lines.append("")
header = f"{'':40s}"
sub_header = f"{'':40s}"
for name, r in results.items():
header += f" {name:>12s}"
sub_header += f" {'(n=' + str(r.total_runs) + ')':>12s}"
lines.append(header)
lines.append(sub_header)
lines.append("-" * len(header))
rate_line = f"{'Overall pass rate':40s}"
ci_line = f"{' 95% CI':40s}"
for r in results.values():
pct = f"{r.pass_rate * 100:.1f}%"
rate_line += f" {pct:>12s}"
lo, hi = wilson_ci(r.passed_runs, r.total_runs)
ci_str = f"[{lo * 100:.0f}, {hi * 100:.0f}]"
ci_line += f" {ci_str:>12s}"
lines.append(rate_line)
lines.append(ci_line)
lines.append("")
all_criteria: list[str] = []
seen: set[str] = set()
for r in results.values():
for crit in r.criterion_counts:
if crit not in seen:
all_criteria.append(crit)
seen.add(crit)
for crit in all_criteria:
crit_line = f"{crit[:40]:40s}"
for r in results.values():
passed, total = r.criterion_counts.get(crit, (0, 0))
crit_line += f" {str(passed) + '/' + str(total):>12s}"
lines.append(crit_line)
lines.append("")
avg_line = f"{'Avg turns':40s}"
err_line = f"{'Errors':40s}"
for r in results.values():
avg_line += f" {str(r.avg_turns):>12s}"
err_line += f" {str(r.errored_runs):>12s}"
lines.append(avg_line)
lines.append(err_line)
if any(r.total_runs < 10 for r in results.values()):
lines.append("")
lines.append("Note: CI is wide due to small sample size; consider --n 10+")
if any(r.partial for r in results.values()):
lines.append("")
lines.append("Warning: Sweep was interrupted — results are incomplete.")
else:
lines.append(f"Scenario: {scenario}")
lines.append("")
lines.append(f"{'Backend':20s} {'Result':8s} {'Score':7s} {'Turns':5s}")
lines.append("-" * 42)
for name, r in results.items():
result_str = "PASS" if r.passed_runs == r.total_runs else "FAIL"
total_criteria = sum(t for _, t in r.criterion_counts.values())
passed_criteria = sum(p for p, _ in r.criterion_counts.values())
score = f"{passed_criteria}/{total_criteria}"
turns_str = (
str(int(r.avg_turns)) if r.avg_turns == int(r.avg_turns) else str(r.avg_turns)
)
lines.append(f"{name:20s} {result_str:8s} {score:7s} {turns_str:5s}")
all_criteria = []
seen = set()
for r in results.values():
for crit in r.criterion_counts:
if crit not in seen:
all_criteria.append(crit)
seen.add(crit)
lines.append("")
header = f"{'':40s}"
for name in results:
header += f" {name:>12s}"
lines.append(header)
lines.append("-" * len(header))
for crit in all_criteria:
crit_line = f"{crit[:40]:40s}"
for r in results.values():
p, t = r.criterion_counts.get(crit, (0, 0))
icon = "PASS" if p == t and t > 0 else "FAIL"
crit_line += f" {icon:>12s}"
lines.append(crit_line)
return "\n".join(lines)

377
evals/drill/engine.py Normal file
View File

@@ -0,0 +1,377 @@
"""Engine: orchestrates the full Drill run lifecycle."""
from __future__ import annotations
import json
import os
import re
import subprocess
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
import yaml
from drill.actor import Actor
from drill.assertions import AssertionResult, run_verify_assertions
from drill.backend import load_backend
from drill.normalizer import (
NORMALIZERS,
collect_new_logs,
filter_codex_logs_by_cwd,
snapshot_log_dir,
)
from drill.session import TmuxSession
from drill.setup import run_assertions, run_helpers
from drill.verifier import Verifier
@dataclass
class VerifyConfig:
criteria: list[str] = field(default_factory=list)
assertions: list[str] = field(default_factory=list)
observe: bool = False
@dataclass
class ScenarioConfig:
scenario: str
description: str
user_posture: str
setup: dict[str, Any]
turns: list[dict[str, Any]]
limits: dict[str, Any]
verify: VerifyConfig
@classmethod
def from_yaml(cls, path: Path) -> ScenarioConfig:
with open(path) as f:
data = yaml.safe_load(f)
verify_data = data.get("verify", {})
return cls(
scenario=data["scenario"],
description=data.get("description", ""),
user_posture=data.get("user_posture", "naive"),
setup=data.get("setup", {}),
turns=data.get("turns", []),
limits=data.get("limits", {"max_turns": 20, "turn_timeout": 120}),
verify=VerifyConfig(
criteria=verify_data.get("criteria", []),
assertions=verify_data.get("assertions", []),
observe=verify_data.get("observe", False),
),
)
@dataclass
class RunResult:
scenario: str
backend: str
timestamp: str
session_log: str
filesystem_json: str
tool_calls_jsonl: str
verdict_json: str
meta: dict[str, Any]
def save_artifacts(self, output_dir: Path) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "session.log").write_text(self.session_log)
(output_dir / "filesystem.json").write_text(self.filesystem_json)
(output_dir / "tool_calls.jsonl").write_text(self.tool_calls_jsonl)
def save_verdict(self, output_dir: Path) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "verdict.json").write_text(self.verdict_json)
(output_dir / "meta.json").write_text(json.dumps(self.meta, indent=2))
def save(self, output_dir: Path) -> None:
self.save_artifacts(output_dir)
self.save_verdict(output_dir)
def snapshot_filesystem(workdir: Path) -> str:
files: list[str] = []
for f in sorted(workdir.rglob("*")):
if ".git" in f.parts:
continue
if f.is_file():
files.append(str(f.relative_to(workdir)))
git_status = _git_cmd(workdir, ["git", "status", "--short"])
branch = _git_cmd(workdir, ["git", "branch", "--show-current"])
worktree_list = _git_cmd(workdir, ["git", "worktree", "list"])
return json.dumps(
{
"files": files,
"git_status": git_status,
"branch": branch,
"worktree_list": worktree_list,
},
indent=2,
)
class Engine:
def __init__(
self,
scenario_path: Path,
backend_name: str,
backends_dir: Path,
fixtures_dir: Path,
results_dir: Path,
) -> None:
self.scenario = ScenarioConfig.from_yaml(scenario_path)
self.backend = load_backend(backend_name, backends_dir)
self.fixtures_dir = fixtures_dir
self.results_dir = results_dir
def run(self, *, output_dir: Path | None = None, run_suffix: str = "") -> RunResult:
start_time = time.time()
timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
self.backend.validate_env()
workdir = Path(f"/tmp/drill-{self.scenario.scenario}-{timestamp}{run_suffix}")
self._setup(workdir)
actual_workdir = workdir
override = self.scenario.setup.get("workdir_override")
if override:
resolved = override.replace("${WORKDIR_NAME}", workdir.name)
actual_workdir = (workdir / resolved).resolve()
# Run assertions in the actual workdir (after override)
assertions = self.scenario.setup.get("assertions", [])
if assertions:
run_assertions(assertions, actual_workdir)
session_name = f"drill-{self.scenario.scenario}-{timestamp}{run_suffix}"
session = TmuxSession(name=session_name, cols=self.backend.cols, rows=self.backend.rows)
log_dir = self._resolve_log_dir(actual_workdir)
log_snapshot = snapshot_log_dir(log_dir) if log_dir else set()
session_log, actor_turns = self._run_session(session, actual_workdir)
filesystem_json = snapshot_filesystem(actual_workdir)
tool_calls = self._collect_tool_calls(log_dir, log_snapshot, actual_workdir)
tool_calls_jsonl = "\n".join(json.dumps(tc) for tc in tool_calls)
# Write artifacts to disk before assertions (assertions read from disk)
if output_dir is None:
output_dir = self.results_dir / self.scenario.scenario / self.backend.name / timestamp
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "session.log").write_text(session_log)
(output_dir / "filesystem.json").write_text(filesystem_json)
(output_dir / "tool_calls.jsonl").write_text(tool_calls_jsonl)
# Run deterministic assertions
assertion_results: list[AssertionResult] = []
if self.scenario.verify.assertions:
if not tool_calls_jsonl.strip():
assertion_results = [
AssertionResult(
command="<pre-check>",
passed=False,
exit_code=1,
stdout="",
stderr="tool_calls.jsonl is empty — session may have crashed",
)
]
else:
assertion_results = run_verify_assertions(
self.scenario.verify.assertions,
output_dir,
actual_workdir,
)
# Run LLM verifier
verifier = Verifier()
verdict = verifier.verify(
session_log=session_log,
filesystem_json=filesystem_json,
tool_calls_jsonl=tool_calls_jsonl,
criteria=self.scenario.verify.criteria,
)
# Merge assertion results into verdict
for ar in assertion_results:
verdict.criteria.append(ar.to_criterion_result())
duration = time.time() - start_time
meta: dict[str, Any] = {
"scenario": self.scenario.scenario,
"backend": self.backend.name,
"backend_model": self.backend.model,
"user_posture": self.scenario.user_posture,
"timestamp": timestamp,
"duration_seconds": round(duration, 1),
"actor_turns": actor_turns,
"actor_model": "claude-sonnet-4-6",
"verifier_model": "claude-sonnet-4-6",
}
result = RunResult(
scenario=self.scenario.scenario,
backend=self.backend.name,
timestamp=timestamp,
session_log=session_log,
filesystem_json=filesystem_json,
tool_calls_jsonl=tool_calls_jsonl,
verdict_json=verdict.model_dump_json(indent=2),
meta=meta,
)
# Write verdict + meta (artifacts already on disk)
(output_dir / "verdict.json").write_text(result.verdict_json)
(output_dir / "meta.json").write_text(json.dumps(result.meta, indent=2))
return result
def _setup(self, workdir: Path) -> None:
# Scenario helpers first (create_base_repo needs to run before anything else)
helpers = self.scenario.setup.get("helpers", [])
run_helpers(helpers, workdir, self.fixtures_dir)
# Backend pre_run hooks after (e.g., codex symlink needs workdir to exist)
hooks_needing_superpowers_root = {"symlink_superpowers", "link_gemini_extension"}
for hook_name in self.backend.hooks.get("pre_run", []):
from setup_helpers import HELPER_REGISTRY
hook = HELPER_REGISTRY.get(hook_name)
if hook and hook_name in hooks_needing_superpowers_root:
hook(workdir, os.environ["SUPERPOWERS_ROOT"]) # ty: ignore[invalid-argument-type, too-many-positional-arguments, missing-argument]
elif hook:
hook(workdir) # ty: ignore[invalid-argument-type, missing-argument]
def _run_session(self, session: TmuxSession, workdir: Path) -> tuple[str, int]:
session.create()
try:
cmd = self.backend.build_command(str(workdir))
session.launch(cmd, str(workdir))
self._wait_for_ready(session, timeout=self.backend.startup_timeout)
actor = Actor()
intents = [t["intent"] for t in self.scenario.turns]
actor.build_system_prompt(posture=self.scenario.user_posture, intents=intents)
max_turns = self.scenario.limits.get("max_turns", 20)
turn_timeout = self.backend.turn_timeout or self.scenario.limits.get(
"turn_timeout", 120
)
all_captures: list[str] = []
turn_count = 0
for turn in range(max_turns):
self._wait_for_ready(session, timeout=turn_timeout)
capture = session.capture()
all_captures.append(f"=== Turn {turn + 1} ===\n{capture}")
actor.append_capture(f"Terminal output:\n{capture}")
action = actor.decide()
turn_count += 1
if action.action == "done" or action.action == "stuck":
break
elif action.action == "type":
session.send_keys(action.text or "")
elif action.action == "key":
session.send_special_key(action.key or "")
final_capture = session.capture()
all_captures.append(f"=== Final ===\n{final_capture}")
if self.backend.shutdown.startswith("<<KEY:"):
key = self.backend.shutdown[6:-2]
session.send_special_key(key)
else:
session.send_keys(self.backend.shutdown)
time.sleep(3)
return "\n".join(all_captures), turn_count
finally:
session.kill()
def _wait_for_ready(self, session: TmuxSession, timeout: float) -> None:
"""Wait until the agent's terminal is ready for Actor input.
Returns when the terminal is quiescent AND matches the backend's
ready pattern. If the backend's busy pattern matches (spinner
visible, "Thinking...", timer counting), the deadline is extended
by small increments up to `max_busy_seconds` total. This prevents
the Actor from interrupting long-running subagent work (wave
execution, multi-file implementation, etc.).
Exits silently if the final deadline (timeout + busy extensions)
passes without reaching a ready state.
"""
quiescence = self.backend.quiescence_seconds
max_busy_extension = float(self.backend.max_busy_seconds)
start = time.time()
deadline = start + timeout
total_busy_extended = 0.0
last_output: str = ""
stable_since: float | None = None
while time.time() < deadline:
current = session.capture()
lines = current.strip().split("\n")
is_busy = any(self.backend.is_busy_line(line) for line in lines)
# If the agent is actively busy, extend the deadline so we
# don't time out mid-subagent-work. Extensions are capped at
# max_busy_seconds total across all extensions combined.
if is_busy:
remaining_budget = max_busy_extension - total_busy_extended
if remaining_budget > 0:
# Ensure we have at least 30 more seconds of headroom.
needed = 30.0 - (deadline - time.time())
if needed > 0:
grant = min(needed, remaining_budget)
deadline += grant
total_busy_extended += grant
# Strip animated elements so they don't reset the quiescence timer:
# - Time counters: "Thinking... (4m 1s)" or "(esc to cancel, 4m 1s)"
# - Braille spinner characters that rotate every frame
normalized = re.sub(r"\((?:esc to cancel, )?(?:\d+[hms]\s*)+\)", "(…)", current)
normalized = re.sub(r"[⠇⠏⠋⠙⠹⠸⠼⠴⠦⠧⠶⠾⠽⠻⠿]", "·", normalized)
if normalized != last_output:
last_output = normalized
stable_since = time.time()
elif stable_since and (time.time() - stable_since) >= quiescence:
if is_busy:
stable_since = None # Reset — agent is still working
elif any(self.backend.is_ready_line(line) for line in lines):
return
time.sleep(0.5)
def _resolve_log_dir(self, workdir: Path) -> Path | None:
"""Resolve the log directory for the given backend and workdir.
Claude Code stores logs at ~/.claude/projects/<encoded-path>/
where the path is the real workdir with / replaced by -.
Codex stores logs at ~/.codex/sessions/.
"""
if self.backend.family == "claude":
real_workdir = workdir.resolve()
encoded = str(real_workdir).replace("/", "-")
log_dir = Path.home() / ".claude" / "projects" / encoded
return log_dir
elif self.backend.family == "codex":
# Codex stores at ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl
return Path.home() / ".codex" / "sessions"
elif self.backend.family == "gemini":
# Gemini stores at ~/.gemini/tmp/<project-name>/chats/session-*.json
# Project name is the workdir basename, lowercased
project = workdir.resolve().name.lower()
return Path.home() / ".gemini" / "tmp" / project
pattern = self.backend.session_logs.get("pattern", "")
if not pattern:
return None
expanded = os.path.expanduser(pattern)
parts = expanded.split("*")[0].rstrip("/")
return Path(parts)
def _collect_tool_calls(
self, log_dir: Path | None, snapshot: set[str], workdir: Path
) -> list[dict[str, Any]]:
if log_dir is None:
return []
new_files = collect_new_logs(log_dir, snapshot)
if self.backend.family == "codex":
new_files = filter_codex_logs_by_cwd(new_files, str(workdir.resolve()))
normalizer = NORMALIZERS.get(self.backend.family)
if not normalizer:
return []
results: list[dict[str, Any]] = []
for log_file in new_files:
results.extend(normalizer(log_file.read_text()))
return results
def _git_cmd(workdir: Path, cmd: list[str]) -> str:
result = subprocess.run(cmd, cwd=workdir, capture_output=True, text=True)
return result.stdout.strip()

228
evals/drill/normalizer.py Normal file
View File

@@ -0,0 +1,228 @@
"""Normalizes backend-specific session logs to a common tool call schema."""
from __future__ import annotations
import json
from collections.abc import Callable
from pathlib import Path
from typing import Any
NATIVE_TOOLS: set[str] = {
"EnterWorktree",
"ExitWorktree",
"EnterPlanMode",
"ExitPlanMode",
"TaskCreate",
"TaskUpdate",
"TaskList",
"TaskGet",
"Skill",
"Agent",
"Read",
"Write",
"Edit",
"Glob",
"Grep",
}
LOG_EXTENSIONS: tuple[str, ...] = ("*.jsonl", "*.json")
def snapshot_log_dir(log_dir: Path) -> set[str]:
"""Snapshot all session log files in a log directory (recursive)."""
if not log_dir.exists():
return set()
files: set[str] = set()
for ext in LOG_EXTENSIONS:
files.update(str(f.relative_to(log_dir)) for f in log_dir.rglob(ext))
return files
def collect_new_logs(log_dir: Path, snapshot: set[str]) -> list[Path]:
"""Find session log files created after the snapshot (recursive)."""
if not log_dir.exists():
return []
current: dict[str, Path] = {}
for ext in LOG_EXTENSIONS:
current.update({str(f.relative_to(log_dir)): f for f in log_dir.rglob(ext)})
new_keys: set[str] = set(current.keys()) - snapshot
return [current[k] for k in sorted(new_keys)]
def filter_codex_logs_by_cwd(paths: list[Path], target_cwd: str) -> list[Path]:
"""Drop codex rollouts whose session_meta.cwd doesn't match target_cwd.
Codex stores all sessions under a shared ~/.codex/sessions/ tree, so when
multiple drill scenarios run in parallel each one's snapshot diff sees every
other run's rollouts. Each rollout's first line is a `session_meta` event
that records the cwd the codex CLI was launched in — use it to attribute
rollouts to the run that produced them.
"""
matched: list[Path] = []
for path in paths:
try:
with path.open() as f:
first_line = f.readline()
entry = json.loads(first_line)
except (OSError, json.JSONDecodeError):
continue
if entry.get("type") != "session_meta":
continue
cwd = entry.get("payload", {}).get("cwd", "")
if cwd == target_cwd:
matched.append(path)
return matched
def normalize_claude_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Claude Code session logs.
CC logs are JSONL where assistant messages have:
{"type": "assistant", "message": {"content": [{"type": "tool_use", "name": "...",
"input": {...}}]}}
"""
results: list[dict[str, Any]] = []
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Handle nested CC format: assistant messages contain tool_use in content array
if entry.get("type") == "assistant":
message = entry.get("message", {})
for block in message.get("content", []):
if block.get("type") == "tool_use":
tool_name = block.get("name", "")
source = "native" if tool_name in NATIVE_TOOLS else "shell"
results.append(
{"tool": tool_name, "args": block.get("input", {}), "source": source}
)
# Also handle flat format (for test compatibility)
elif entry.get("type") == "tool_use":
tool_name = entry.get("name", "")
source = "native" if tool_name in NATIVE_TOOLS else "shell"
results.append({"tool": tool_name, "args": entry.get("input", {}), "source": source})
return results
def normalize_codex_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Codex rollout logs.
Codex logs use: {"type": "response_item", "payload": {"type": "function_call", ...}}
Tool calls are "function_call" with name "exec_command" (shell) or other names.
"""
results: list[dict[str, Any]] = []
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("type") != "response_item":
continue
# Codex uses "payload" not "item"
payload = entry.get("payload", entry.get("item", {}))
payload_type = payload.get("type", "")
if payload_type == "function_call":
name = payload.get("name", "")
raw_args = payload.get("arguments", "{}")
# Arguments are JSON-encoded strings in codex
if isinstance(raw_args, str):
try:
args = json.loads(raw_args)
except json.JSONDecodeError:
args = {"raw": raw_args}
else:
args = raw_args
# exec_command is codex's shell tool
if name == "exec_command":
results.append(
{"tool": "Bash", "args": {"command": args.get("cmd", "")}, "source": "shell"}
)
elif name == "apply_patch":
results.append({"tool": "Edit", "args": args, "source": "native"})
else:
source = "native" if name in NATIVE_TOOLS else "shell"
results.append({"tool": name, "args": args, "source": source})
elif payload_type == "local_shell_call":
action = payload.get("action", {})
cmd = action.get("command", [])
cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
results.append({"tool": "Bash", "args": {"command": cmd_str}, "source": "shell"})
return results
# Reverse mapping: Gemini tool names → Claude Code canonical names
GEMINI_TOOL_MAP: dict[str, str] = {
"run_shell_command": "Bash",
"read_file": "Read",
"write_file": "Write",
"replace": "Edit",
"grep_search": "Grep",
"glob": "Glob",
"activate_skill": "Skill",
"google_web_search": "WebSearch",
"web_fetch": "WebFetch",
"write_todos": "TodoWrite",
"list_directory": "Glob",
"enter_plan_mode": "EnterPlanMode",
"exit_plan_mode": "ExitPlanMode",
}
def normalize_gemini_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Gemini CLI session logs.
Gemini logs may be a single JSON file with a messages array, or JSONL
session files in newer CLI versions. Each "gemini" message may have a
toolCalls array:
{"name": "run_shell_command", "args": {"command": "..."}, "status": "success"}
"""
results: list[dict[str, Any]] = []
messages: list[dict[str, Any]] = []
try:
data = json.loads(raw_content)
except json.JSONDecodeError:
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(entry, dict):
messages.append(entry)
else:
if isinstance(data, dict) and "messages" in data:
messages = [m for m in data.get("messages", []) if isinstance(m, dict)]
elif isinstance(data, dict):
messages = [data]
elif isinstance(data, list):
messages = [m for m in data if isinstance(m, dict)]
seen_tool_calls: set[str] = set()
for message in messages:
if message.get("type") != "gemini":
continue
for tc in message.get("toolCalls", []):
tool_call_id = tc.get("id")
if tool_call_id and tool_call_id in seen_tool_calls:
continue
if tool_call_id:
seen_tool_calls.add(tool_call_id)
gemini_name = tc.get("name", "")
canonical = GEMINI_TOOL_MAP.get(gemini_name, gemini_name)
args = tc.get("args", {})
source = "native" if canonical in NATIVE_TOOLS else "shell"
results.append({"tool": canonical, "args": args, "source": source})
return results
NORMALIZERS: dict[str, Callable[[str], list[dict[str, Any]]]] = {
"claude": normalize_claude_logs,
"codex": normalize_codex_logs,
"gemini": normalize_gemini_logs,
}

88
evals/drill/session.py Normal file
View File

@@ -0,0 +1,88 @@
"""tmux session management for driving agent CLI sessions."""
from __future__ import annotations
import subprocess
import time
class TmuxSession:
def __init__(self, name: str, cols: int = 200, rows: int = 50) -> None:
self.name = name
self.cols = cols
self.rows = rows
def create(self) -> None:
subprocess.run(
[
"tmux",
"new-session",
"-d",
"-s",
self.name,
"-x",
str(self.cols),
"-y",
str(self.rows),
],
check=True,
)
def launch(self, command: list[str], cwd: str) -> None:
cmd_str = " ".join(command)
self.send_keys(f"cd {cwd} && {cmd_str}")
def send_keys(self, text: str) -> None:
if text:
buffer_name = f"{self.name}-input"
subprocess.run(
["tmux", "set-buffer", "-b", buffer_name, text],
check=True,
)
subprocess.run(
["tmux", "paste-buffer", "-d", "-b", buffer_name, "-t", self.name],
check=True,
)
time.sleep(0.1)
subprocess.run(
["tmux", "send-keys", "-t", self.name, "Enter"],
check=True,
)
def send_special_key(self, key: str) -> None:
key_map = {
"ctrl-c": "C-c",
"ctrl-d": "C-d",
"ctrl-z": "C-z",
"enter": "Enter",
"escape": "Escape",
}
tmux_key = key_map.get(key, key)
subprocess.run(
["tmux", "send-keys", "-t", self.name, tmux_key],
check=True,
)
def capture(self) -> str:
result = subprocess.run(
["tmux", "capture-pane", "-t", self.name, "-p"],
capture_output=True,
text=True,
check=True,
)
return result.stdout
def is_process_alive(self) -> bool:
result = subprocess.run(
["tmux", "list-panes", "-t", self.name, "-F", "#{pane_dead}"],
capture_output=True,
text=True,
)
return result.stdout.strip() == "0"
def kill(self) -> None:
subprocess.run(
["tmux", "kill-session", "-t", self.name],
capture_output=True,
)

43
evals/drill/setup.py Normal file
View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from setup_helpers import HELPER_REGISTRY
from setup_helpers.base import create_base_repo
def clone_template(template_dir: Path, workdir: Path) -> None:
"""Clone (or build) template_dir into workdir with full git history."""
create_base_repo(workdir, template_dir)
def run_helpers(helper_names: list[str], workdir: Path, fixtures_dir: Path) -> None:
for name in helper_names:
helper = HELPER_REGISTRY.get(name)
if helper is None:
raise ValueError(f"Unknown setup helper: {name}")
if name == "create_base_repo":
helper(workdir, fixtures_dir / "template-repo") # ty: ignore[invalid-argument-type, too-many-positional-arguments, missing-argument]
elif name == "symlink_superpowers":
import os
helper(workdir, os.environ["SUPERPOWERS_ROOT"]) # ty: ignore[invalid-argument-type, too-many-positional-arguments, missing-argument]
else:
helper(workdir) # ty: ignore[invalid-argument-type, missing-argument]
def run_assertions(assertions: list[str], workdir: Path) -> None:
for assertion in assertions:
result = subprocess.run(
assertion,
shell=True,
cwd=workdir,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise AssertionError(
f"Setup assertion failed: {assertion}\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)

17
evals/drill/stats.py Normal file
View File

@@ -0,0 +1,17 @@
"""Statistical utilities for drill result analysis."""
from __future__ import annotations
import math
def wilson_ci(passed: int, total: int, z: float = 1.96) -> tuple[float, float]:
if total == 0:
return (0.0, 0.0)
if passed > total:
passed = total
p = passed / total
denom = 1 + z**2 / total
center = (p + z**2 / (2 * total)) / denom
margin = (z / denom) * math.sqrt(p * (1 - p) / total + z**2 / (4 * total**2))
return (max(0.0, center - margin), min(1.0, center + margin))

159
evals/drill/sweep.py Normal file
View File

@@ -0,0 +1,159 @@
"""Sweep orchestrator: runs scenarios N times across multiple backends."""
from __future__ import annotations
import glob as glob_mod
import json
import shutil
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
import yaml
from drill.engine import Engine, RunResult
from drill.verifier import Verdict
@dataclass
class RunStatus:
index: int
status: str # "pass", "fail", "error"
duration: float
error: str | None = None
@dataclass
class RunGroup:
scenario: str
backend: str
n: int
timestamp: str
sweep_id: str
runs: list[RunStatus] = field(default_factory=list)
partial: bool = False
def write_run_group(group: RunGroup, output_dir: Path) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
data: dict[str, Any] = {
"scenario": group.scenario,
"backend": group.backend,
"n": group.n,
"timestamp": group.timestamp,
"sweep_id": group.sweep_id,
"partial": group.partial,
"runs": [
{k: v for k, v in asdict(r).items() if k != "error" or v is not None}
for r in group.runs
],
}
(output_dir / "run-group.json").write_text(json.dumps(data, indent=2))
class Sweep:
def __init__(
self,
scenario_path: Path,
backend_names: list[str],
backends_dir: Path,
fixtures_dir: Path,
results_dir: Path,
n: int,
sweep_id: str,
) -> None:
self.scenario_path = scenario_path
self.backend_names = backend_names
self.backends_dir = backends_dir
self.fixtures_dir = fixtures_dir
self.results_dir = results_dir
self.n = n
self.sweep_id = sweep_id
self._scenario_name_cache: str | None = None
def validate_backends(self) -> None:
for name in self.backend_names:
path = self.backends_dir / f"{name}.yaml"
if not path.exists():
raise FileNotFoundError(f"Backend config not found: {path}")
def run_all(self) -> list[RunGroup]:
self.validate_backends()
groups: list[RunGroup] = []
for backend_name in self.backend_names:
group = self._run_backend(backend_name)
groups.append(group)
return groups
def _run_backend(self, backend_name: str) -> RunGroup:
timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
group_dir = (
self.results_dir / self.scenario_name / backend_name / f"{timestamp}-{self.sweep_id}"
)
group_dir.mkdir(parents=True, exist_ok=True)
group = RunGroup(
scenario=self.scenario_name,
backend=backend_name,
n=self.n,
timestamp=timestamp,
sweep_id=self.sweep_id,
)
try:
for i in range(self.n):
run_status = self._run_single(backend_name, group_dir, i, timestamp)
group.runs.append(run_status)
except KeyboardInterrupt:
group.partial = True
finally:
write_run_group(group, group_dir)
return group
def _run_single(
self, backend_name: str, group_dir: Path, index: int, timestamp: str
) -> RunStatus:
run_suffix = f"-run-{index:02d}"
run_dir = group_dir / f"run-{index:02d}"
start = time.time()
try:
engine = Engine(
scenario_path=self.scenario_path,
backend_name=backend_name,
backends_dir=self.backends_dir,
fixtures_dir=self.fixtures_dir,
results_dir=self.results_dir,
)
result: RunResult = engine.run(output_dir=run_dir, run_suffix=run_suffix)
verdict = Verdict.model_validate_json(result.verdict_json)
duration = time.time() - start
status = "pass" if verdict.passed else "fail"
return RunStatus(index=index, status=status, duration=round(duration, 1))
except KeyboardInterrupt:
raise
except Exception as e:
duration = time.time() - start
return RunStatus(
index=index,
status="error",
duration=round(duration, 1),
error=str(e),
)
finally:
pattern = f"/tmp/drill-*-{timestamp}{run_suffix}"
for d in glob_mod.glob(pattern):
p = Path(d)
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
@property
def scenario_name(self) -> str:
if self._scenario_name_cache is None:
with open(self.scenario_path) as f:
data = yaml.safe_load(f)
self._scenario_name_cache = data["scenario"]
return self._scenario_name_cache

93
evals/drill/verifier.py Normal file
View File

@@ -0,0 +1,93 @@
"""Verifier LLM: evaluates agent session against criteria."""
from __future__ import annotations
from pathlib import Path
import anthropic
from pydantic import BaseModel
class CriterionResult(BaseModel):
criterion: str
verdict: str
evidence: str
rationale: str
source: str = "judge"
class Verdict(BaseModel):
criteria: list[CriterionResult]
observations: list[str]
summary: str
@property
def score(self) -> str:
passed = sum(1 for c in self.criteria if c.verdict == "pass")
return f"{passed}/{len(self.criteria)}"
@property
def passed(self) -> bool:
return all(c.verdict == "pass" for c in self.criteria)
class Verifier:
MAX_RETRIES = 3
def __init__(self, model: str = "claude-sonnet-4-6", temperature: float = 0.0) -> None:
self.model = model
self.temperature = temperature
self._client: anthropic.Anthropic = anthropic.Anthropic()
def build_system_prompt(self) -> str:
template_path = Path(__file__).parent.parent / "prompts" / "verifier.md"
return template_path.read_text()
def verify(
self,
session_log: str,
filesystem_json: str,
tool_calls_jsonl: str,
criteria: list[str],
) -> Verdict:
system = self.build_system_prompt()
user_content = (
"## Terminal Session Log\n\n"
f"```\n{session_log}\n```\n\n"
"## Filesystem State\n\n"
f"```json\n{filesystem_json}\n```\n\n"
"## Tool Call Log\n\n"
f"```jsonl\n{tool_calls_jsonl}\n```\n\n"
"## Criteria to Evaluate\n\n" + "\n".join(f"- {c}" for c in criteria)
)
for attempt in range(self.MAX_RETRIES):
response = self._client.messages.create(
model=self.model,
max_tokens=4096,
temperature=self.temperature,
system=system,
messages=[{"role": "user", "content": user_content}],
)
text = response.content[0].text # ty: ignore[unresolved-attribute]
json_str = _extract_json(text)
try:
return Verdict.model_validate_json(json_str)
except Exception:
if attempt == self.MAX_RETRIES - 1:
raise
continue
raise RuntimeError("Verifier failed to return valid JSON")
def _extract_json(text: str) -> str:
if "```json" in text:
start = text.index("```json") + 7
end = text.index("```", start)
return text[start:end].strip()
if "```" in text:
start = text.index("```") + 3
end = text.index("```", start)
return text[start:end].strip()
start = text.index("{")
end = text.rindex("}") + 1
return text[start:end]