mirror of
https://github.com/obra/superpowers.git
synced 2026-05-10 02:59:04 +08:00
rsync of obra/drill@013fcb8b7d into superpowers/evals/, excluding .git/, .venv/, results/, .env/, __pycache__/, *.egg-info/, .private-journal/. The drill repo is unaffected by this commit; archival is a separate manual step after this PR merges. Source SHA recorded at evals/.drill-source-sha for divergence detection.
229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
"""Normalizes backend-specific session logs to a common tool call schema."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from collections.abc import Callable
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
NATIVE_TOOLS: set[str] = {
|
|
"EnterWorktree",
|
|
"ExitWorktree",
|
|
"EnterPlanMode",
|
|
"ExitPlanMode",
|
|
"TaskCreate",
|
|
"TaskUpdate",
|
|
"TaskList",
|
|
"TaskGet",
|
|
"Skill",
|
|
"Agent",
|
|
"Read",
|
|
"Write",
|
|
"Edit",
|
|
"Glob",
|
|
"Grep",
|
|
}
|
|
|
|
LOG_EXTENSIONS: tuple[str, ...] = ("*.jsonl", "*.json")
|
|
|
|
|
|
def snapshot_log_dir(log_dir: Path) -> set[str]:
|
|
"""Snapshot all session log files in a log directory (recursive)."""
|
|
if not log_dir.exists():
|
|
return set()
|
|
files: set[str] = set()
|
|
for ext in LOG_EXTENSIONS:
|
|
files.update(str(f.relative_to(log_dir)) for f in log_dir.rglob(ext))
|
|
return files
|
|
|
|
|
|
def collect_new_logs(log_dir: Path, snapshot: set[str]) -> list[Path]:
|
|
"""Find session log files created after the snapshot (recursive)."""
|
|
if not log_dir.exists():
|
|
return []
|
|
current: dict[str, Path] = {}
|
|
for ext in LOG_EXTENSIONS:
|
|
current.update({str(f.relative_to(log_dir)): f for f in log_dir.rglob(ext)})
|
|
new_keys: set[str] = set(current.keys()) - snapshot
|
|
return [current[k] for k in sorted(new_keys)]
|
|
|
|
|
|
def filter_codex_logs_by_cwd(paths: list[Path], target_cwd: str) -> list[Path]:
|
|
"""Drop codex rollouts whose session_meta.cwd doesn't match target_cwd.
|
|
|
|
Codex stores all sessions under a shared ~/.codex/sessions/ tree, so when
|
|
multiple drill scenarios run in parallel each one's snapshot diff sees every
|
|
other run's rollouts. Each rollout's first line is a `session_meta` event
|
|
that records the cwd the codex CLI was launched in — use it to attribute
|
|
rollouts to the run that produced them.
|
|
"""
|
|
matched: list[Path] = []
|
|
for path in paths:
|
|
try:
|
|
with path.open() as f:
|
|
first_line = f.readline()
|
|
entry = json.loads(first_line)
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
if entry.get("type") != "session_meta":
|
|
continue
|
|
cwd = entry.get("payload", {}).get("cwd", "")
|
|
if cwd == target_cwd:
|
|
matched.append(path)
|
|
return matched
|
|
|
|
|
|
def normalize_claude_logs(raw_content: str) -> list[dict[str, Any]]:
|
|
"""Normalize Claude Code session logs.
|
|
|
|
CC logs are JSONL where assistant messages have:
|
|
{"type": "assistant", "message": {"content": [{"type": "tool_use", "name": "...",
|
|
"input": {...}}]}}
|
|
"""
|
|
results: list[dict[str, Any]] = []
|
|
for line in raw_content.strip().split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# Handle nested CC format: assistant messages contain tool_use in content array
|
|
if entry.get("type") == "assistant":
|
|
message = entry.get("message", {})
|
|
for block in message.get("content", []):
|
|
if block.get("type") == "tool_use":
|
|
tool_name = block.get("name", "")
|
|
source = "native" if tool_name in NATIVE_TOOLS else "shell"
|
|
results.append(
|
|
{"tool": tool_name, "args": block.get("input", {}), "source": source}
|
|
)
|
|
# Also handle flat format (for test compatibility)
|
|
elif entry.get("type") == "tool_use":
|
|
tool_name = entry.get("name", "")
|
|
source = "native" if tool_name in NATIVE_TOOLS else "shell"
|
|
results.append({"tool": tool_name, "args": entry.get("input", {}), "source": source})
|
|
return results
|
|
|
|
|
|
def normalize_codex_logs(raw_content: str) -> list[dict[str, Any]]:
|
|
"""Normalize Codex rollout logs.
|
|
|
|
Codex logs use: {"type": "response_item", "payload": {"type": "function_call", ...}}
|
|
Tool calls are "function_call" with name "exec_command" (shell) or other names.
|
|
"""
|
|
results: list[dict[str, Any]] = []
|
|
for line in raw_content.strip().split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if entry.get("type") != "response_item":
|
|
continue
|
|
# Codex uses "payload" not "item"
|
|
payload = entry.get("payload", entry.get("item", {}))
|
|
payload_type = payload.get("type", "")
|
|
if payload_type == "function_call":
|
|
name = payload.get("name", "")
|
|
raw_args = payload.get("arguments", "{}")
|
|
# Arguments are JSON-encoded strings in codex
|
|
if isinstance(raw_args, str):
|
|
try:
|
|
args = json.loads(raw_args)
|
|
except json.JSONDecodeError:
|
|
args = {"raw": raw_args}
|
|
else:
|
|
args = raw_args
|
|
# exec_command is codex's shell tool
|
|
if name == "exec_command":
|
|
results.append(
|
|
{"tool": "Bash", "args": {"command": args.get("cmd", "")}, "source": "shell"}
|
|
)
|
|
elif name == "apply_patch":
|
|
results.append({"tool": "Edit", "args": args, "source": "native"})
|
|
else:
|
|
source = "native" if name in NATIVE_TOOLS else "shell"
|
|
results.append({"tool": name, "args": args, "source": source})
|
|
elif payload_type == "local_shell_call":
|
|
action = payload.get("action", {})
|
|
cmd = action.get("command", [])
|
|
cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
|
|
results.append({"tool": "Bash", "args": {"command": cmd_str}, "source": "shell"})
|
|
return results
|
|
|
|
|
|
# Reverse mapping: Gemini tool names → Claude Code canonical names
|
|
GEMINI_TOOL_MAP: dict[str, str] = {
|
|
"run_shell_command": "Bash",
|
|
"read_file": "Read",
|
|
"write_file": "Write",
|
|
"replace": "Edit",
|
|
"grep_search": "Grep",
|
|
"glob": "Glob",
|
|
"activate_skill": "Skill",
|
|
"google_web_search": "WebSearch",
|
|
"web_fetch": "WebFetch",
|
|
"write_todos": "TodoWrite",
|
|
"list_directory": "Glob",
|
|
"enter_plan_mode": "EnterPlanMode",
|
|
"exit_plan_mode": "ExitPlanMode",
|
|
}
|
|
|
|
|
|
def normalize_gemini_logs(raw_content: str) -> list[dict[str, Any]]:
|
|
"""Normalize Gemini CLI session logs.
|
|
|
|
Gemini logs may be a single JSON file with a messages array, or JSONL
|
|
session files in newer CLI versions. Each "gemini" message may have a
|
|
toolCalls array:
|
|
{"name": "run_shell_command", "args": {"command": "..."}, "status": "success"}
|
|
"""
|
|
results: list[dict[str, Any]] = []
|
|
messages: list[dict[str, Any]] = []
|
|
try:
|
|
data = json.loads(raw_content)
|
|
except json.JSONDecodeError:
|
|
for line in raw_content.strip().split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if isinstance(entry, dict):
|
|
messages.append(entry)
|
|
else:
|
|
if isinstance(data, dict) and "messages" in data:
|
|
messages = [m for m in data.get("messages", []) if isinstance(m, dict)]
|
|
elif isinstance(data, dict):
|
|
messages = [data]
|
|
elif isinstance(data, list):
|
|
messages = [m for m in data if isinstance(m, dict)]
|
|
|
|
seen_tool_calls: set[str] = set()
|
|
for message in messages:
|
|
if message.get("type") != "gemini":
|
|
continue
|
|
for tc in message.get("toolCalls", []):
|
|
tool_call_id = tc.get("id")
|
|
if tool_call_id and tool_call_id in seen_tool_calls:
|
|
continue
|
|
if tool_call_id:
|
|
seen_tool_calls.add(tool_call_id)
|
|
gemini_name = tc.get("name", "")
|
|
canonical = GEMINI_TOOL_MAP.get(gemini_name, gemini_name)
|
|
args = tc.get("args", {})
|
|
source = "native" if canonical in NATIVE_TOOLS else "shell"
|
|
results.append({"tool": canonical, "args": args, "source": source})
|
|
return results
|
|
|
|
|
|
NORMALIZERS: dict[str, Callable[[str], list[dict[str, Any]]]] = {
|
|
"claude": normalize_claude_logs,
|
|
"codex": normalize_codex_logs,
|
|
"gemini": normalize_gemini_logs,
|
|
}
|