Files
superpowers/evals/drill/normalizer.py
Jesse Vincent 3c046f579e Lift drill into evals/ at 013fcb8b7dbefd6d3fa4653493e5d2ec8e7f985b
rsync of obra/drill@013fcb8b7d into superpowers/evals/, excluding
.git/, .venv/, results/, .env/, __pycache__/, *.egg-info/,
.private-journal/.

The drill repo is unaffected by this commit; archival is a separate
manual step after this PR merges.

Source SHA recorded at evals/.drill-source-sha for divergence
detection.
2026-05-06 12:15:46 -07:00

229 lines
8.2 KiB
Python

"""Normalizes backend-specific session logs to a common tool call schema."""
from __future__ import annotations
import json
from collections.abc import Callable
from pathlib import Path
from typing import Any
NATIVE_TOOLS: set[str] = {
"EnterWorktree",
"ExitWorktree",
"EnterPlanMode",
"ExitPlanMode",
"TaskCreate",
"TaskUpdate",
"TaskList",
"TaskGet",
"Skill",
"Agent",
"Read",
"Write",
"Edit",
"Glob",
"Grep",
}
LOG_EXTENSIONS: tuple[str, ...] = ("*.jsonl", "*.json")
def snapshot_log_dir(log_dir: Path) -> set[str]:
"""Snapshot all session log files in a log directory (recursive)."""
if not log_dir.exists():
return set()
files: set[str] = set()
for ext in LOG_EXTENSIONS:
files.update(str(f.relative_to(log_dir)) for f in log_dir.rglob(ext))
return files
def collect_new_logs(log_dir: Path, snapshot: set[str]) -> list[Path]:
"""Find session log files created after the snapshot (recursive)."""
if not log_dir.exists():
return []
current: dict[str, Path] = {}
for ext in LOG_EXTENSIONS:
current.update({str(f.relative_to(log_dir)): f for f in log_dir.rglob(ext)})
new_keys: set[str] = set(current.keys()) - snapshot
return [current[k] for k in sorted(new_keys)]
def filter_codex_logs_by_cwd(paths: list[Path], target_cwd: str) -> list[Path]:
"""Drop codex rollouts whose session_meta.cwd doesn't match target_cwd.
Codex stores all sessions under a shared ~/.codex/sessions/ tree, so when
multiple drill scenarios run in parallel each one's snapshot diff sees every
other run's rollouts. Each rollout's first line is a `session_meta` event
that records the cwd the codex CLI was launched in — use it to attribute
rollouts to the run that produced them.
"""
matched: list[Path] = []
for path in paths:
try:
with path.open() as f:
first_line = f.readline()
entry = json.loads(first_line)
except (OSError, json.JSONDecodeError):
continue
if entry.get("type") != "session_meta":
continue
cwd = entry.get("payload", {}).get("cwd", "")
if cwd == target_cwd:
matched.append(path)
return matched
def normalize_claude_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Claude Code session logs.
CC logs are JSONL where assistant messages have:
{"type": "assistant", "message": {"content": [{"type": "tool_use", "name": "...",
"input": {...}}]}}
"""
results: list[dict[str, Any]] = []
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Handle nested CC format: assistant messages contain tool_use in content array
if entry.get("type") == "assistant":
message = entry.get("message", {})
for block in message.get("content", []):
if block.get("type") == "tool_use":
tool_name = block.get("name", "")
source = "native" if tool_name in NATIVE_TOOLS else "shell"
results.append(
{"tool": tool_name, "args": block.get("input", {}), "source": source}
)
# Also handle flat format (for test compatibility)
elif entry.get("type") == "tool_use":
tool_name = entry.get("name", "")
source = "native" if tool_name in NATIVE_TOOLS else "shell"
results.append({"tool": tool_name, "args": entry.get("input", {}), "source": source})
return results
def normalize_codex_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Codex rollout logs.
Codex logs use: {"type": "response_item", "payload": {"type": "function_call", ...}}
Tool calls are "function_call" with name "exec_command" (shell) or other names.
"""
results: list[dict[str, Any]] = []
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("type") != "response_item":
continue
# Codex uses "payload" not "item"
payload = entry.get("payload", entry.get("item", {}))
payload_type = payload.get("type", "")
if payload_type == "function_call":
name = payload.get("name", "")
raw_args = payload.get("arguments", "{}")
# Arguments are JSON-encoded strings in codex
if isinstance(raw_args, str):
try:
args = json.loads(raw_args)
except json.JSONDecodeError:
args = {"raw": raw_args}
else:
args = raw_args
# exec_command is codex's shell tool
if name == "exec_command":
results.append(
{"tool": "Bash", "args": {"command": args.get("cmd", "")}, "source": "shell"}
)
elif name == "apply_patch":
results.append({"tool": "Edit", "args": args, "source": "native"})
else:
source = "native" if name in NATIVE_TOOLS else "shell"
results.append({"tool": name, "args": args, "source": source})
elif payload_type == "local_shell_call":
action = payload.get("action", {})
cmd = action.get("command", [])
cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
results.append({"tool": "Bash", "args": {"command": cmd_str}, "source": "shell"})
return results
# Reverse mapping: Gemini tool names → Claude Code canonical names
GEMINI_TOOL_MAP: dict[str, str] = {
"run_shell_command": "Bash",
"read_file": "Read",
"write_file": "Write",
"replace": "Edit",
"grep_search": "Grep",
"glob": "Glob",
"activate_skill": "Skill",
"google_web_search": "WebSearch",
"web_fetch": "WebFetch",
"write_todos": "TodoWrite",
"list_directory": "Glob",
"enter_plan_mode": "EnterPlanMode",
"exit_plan_mode": "ExitPlanMode",
}
def normalize_gemini_logs(raw_content: str) -> list[dict[str, Any]]:
"""Normalize Gemini CLI session logs.
Gemini logs may be a single JSON file with a messages array, or JSONL
session files in newer CLI versions. Each "gemini" message may have a
toolCalls array:
{"name": "run_shell_command", "args": {"command": "..."}, "status": "success"}
"""
results: list[dict[str, Any]] = []
messages: list[dict[str, Any]] = []
try:
data = json.loads(raw_content)
except json.JSONDecodeError:
for line in raw_content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(entry, dict):
messages.append(entry)
else:
if isinstance(data, dict) and "messages" in data:
messages = [m for m in data.get("messages", []) if isinstance(m, dict)]
elif isinstance(data, dict):
messages = [data]
elif isinstance(data, list):
messages = [m for m in data if isinstance(m, dict)]
seen_tool_calls: set[str] = set()
for message in messages:
if message.get("type") != "gemini":
continue
for tc in message.get("toolCalls", []):
tool_call_id = tc.get("id")
if tool_call_id and tool_call_id in seen_tool_calls:
continue
if tool_call_id:
seen_tool_calls.add(tool_call_id)
gemini_name = tc.get("name", "")
canonical = GEMINI_TOOL_MAP.get(gemini_name, gemini_name)
args = tc.get("args", {})
source = "native" if canonical in NATIVE_TOOLS else "shell"
results.append({"tool": canonical, "args": args, "source": source})
return results
NORMALIZERS: dict[str, Callable[[str], list[dict[str, Any]]]] = {
"claude": normalize_claude_logs,
"codex": normalize_codex_logs,
"gemini": normalize_gemini_logs,
}