validate_atoms.py 7.2 KB
#!/usr/bin/env python3
"""
校验 atoms 质量,输出问题明细与汇总报告。

用法:
  python3 scripts/validate_atoms.py
  python3 scripts/validate_atoms.py build/v4.57.3
"""

from __future__ import annotations

import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path


BASE_DIR = Path(__file__).parent.parent
DEFAULT_TARGET = BASE_DIR / "build"
OUTPUT_DIR = BASE_DIR / "dist" / "quality"

SCREENSHOT_MARKERS = ("[截图]", "[图]", "截图")
GENERIC_RESULTS = {
    "满足预期",
    "搜索出结果",
    "成功",
    "失败",
    "显示成功",
    "显示失败",
}


def iter_jsonl_files(target: Path) -> list[Path]:
    if target.is_file():
        return [target]
    patterns = [
        "*case_atoms.jsonl",
        "*case_atoms_model.jsonl",
        "*doc_atoms.jsonl",
        "*doc_atoms_model.jsonl",
        "*rule_atoms.jsonl",
        "*rule_atoms_model.jsonl",
    ]
    files = []
    seen = set()
    for pattern in patterns:
        for path in sorted(target.rglob(pattern)):
            if path in seen:
                continue
            seen.add(path)
            files.append(path)
    return files


def clean_text(text: str) -> str:
    return re.sub(r"\s+", " ", str(text or "")).strip()


def detect_issues(row: dict) -> list[str]:
    issues: list[str] = []
    c = clean_text(row.get("C", ""))
    a = clean_text(row.get("A", ""))
    r = clean_text(row.get("R", ""))
    source_type = row.get("atom_type", "")
    evidence = row.get("evidence")
    canon_text = clean_text(row.get("canon_text", ""))

    if not row.get("app_version"):
        issues.append("missing_app_version")
    if not row.get("primary_module"):
        issues.append("missing_primary_module")
    if source_type == "definition":
        if not canon_text:
            issues.append("missing_canon_text")
        elif not (canon_text.startswith("TERM=") or canon_text.startswith("C=")):
            issues.append("invalid_definition_format")
    else:
        if not canon_text:
            issues.append("missing_canon_text")
        if canon_text and not canon_text.startswith("C="):
            issues.append("invalid_canon_format")
        if not a:
            issues.append("missing_action")
        if not r:
            issues.append("missing_result")
        if r in GENERIC_RESULTS:
            issues.append("generic_result")
        if any(marker in r for marker in SCREENSHOT_MARKERS):
            issues.append("screenshot_marker_in_result")
        if len(r) > 200:
            issues.append("result_too_long")
        if source_type == "doc_rule" and len(a) < 4:
            issues.append("doc_action_too_short")
        if source_type == "doc_rule" and a and re.match(r"^[的了在和与及、,;。]+", a):
            issues.append("doc_action_fragmented")
        if source_type == "case_rule" and re.search(r"^\d+[,、.]", a):
            issues.append("case_action_is_enumeration")
        if source_type == "case_rule" and any(marker in canon_text for marker in SCREENSHOT_MARKERS):
            issues.append("screenshot_marker_in_canon")
    if source_type == "rule" and isinstance(evidence, list):
        if not any(item.get("raw_text") for item in evidence if isinstance(item, dict)):
            issues.append("rule_missing_raw_evidence")
    elif source_type in {"case_rule", "doc_rule"} and not evidence:
        issues.append("missing_evidence")

    return issues


def main() -> None:
    target = Path(sys.argv[1]).resolve() if len(sys.argv) > 1 else DEFAULT_TARGET
    files = iter_jsonl_files(target)
    if not files:
        print("no_atom_files_found")
        return

    issue_counter: Counter[str] = Counter()
    file_counter: dict[str, Counter[str]] = defaultdict(Counter)
    issue_rows = []
    total_rows = 0

    for path in files:
        rel_path = str(path.relative_to(BASE_DIR))
        with path.open("r", encoding="utf-8") as handle:
            for line_no, raw_line in enumerate(handle, start=1):
                line = raw_line.strip()
                if not line:
                    continue
                total_rows += 1
                try:
                    row = json.loads(line)
                except json.JSONDecodeError as exc:
                    issue_counter["invalid_json"] += 1
                    file_counter[rel_path]["invalid_json"] += 1
                    issue_rows.append(
                        {
                            "file": rel_path,
                            "line_no": line_no,
                            "issue": "invalid_json",
                            "detail": str(exc),
                        }
                    )
                    continue

                row_issues = detect_issues(row)
                for issue in row_issues:
                    issue_counter[issue] += 1
                    file_counter[rel_path][issue] += 1
                    issue_rows.append(
                        {
                            "file": rel_path,
                            "line_no": line_no,
                            "issue": issue,
                            "app_version": row.get("app_version", ""),
                            "atom_type": row.get("atom_type", ""),
                            "primary_module": row.get("primary_module", ""),
                            "feature_scope": clean_text(row.get("feature_scope", ""))[:120],
                            "preview": clean_text(row.get("R") or row.get("A") or row.get("canon_text", ""))[:200],
                        }
                    )

    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    issues_path = OUTPUT_DIR / "atom_issues.jsonl"
    summary_path = OUTPUT_DIR / "atom_quality_summary.md"

    with issues_path.open("w", encoding="utf-8") as handle:
        for row in issue_rows:
            handle.write(json.dumps(row, ensure_ascii=False) + "\n")

    lines = [
        "# Atoms 质量报告",
        "",
        f"- 扫描文件数:{len(files)}",
        f"- 扫描行数:{total_rows}",
        f"- 问题总数:{sum(issue_counter.values())}",
        "",
        "## 问题分布",
        "",
    ]
    for issue, count in issue_counter.most_common():
        lines.append(f"- {issue}: {count}")

    lines.extend(["", "## 文件分布", ""])
    ranked_files = sorted(file_counter.items(), key=lambda item: (-sum(item[1].values()), item[0]))
    for rel_path, counter in ranked_files[:50]:
        top_issue = ", ".join(f"{issue}:{count}" for issue, count in counter.most_common(5))
        lines.append(f"- {rel_path}: {sum(counter.values())} ({top_issue})")

    lines.extend(["", "## 建议", ""])
    lines.append("- `invalid_json`、`missing_*`:先修输入数据,再讨论检索效果。")
    lines.append("- `screenshot_marker_*`、`case_action_is_enumeration`:说明纯脚本抽取未完成语义清洗,应改为模型蒸馏。")
    lines.append("- `doc_action_fragmented`、`doc_action_too_short`:说明 PDF 切块断裂,培训文档必须走模型重组。")
    lines.append("")

    summary_path.write_text("\n".join(lines) + "\n", encoding="utf-8")

    print(f"files={len(files)}")
    print(f"rows={total_rows}")
    print(f"issues={sum(issue_counter.values())}")
    print(f"output={OUTPUT_DIR.relative_to(BASE_DIR)}")


if __name__ == "__main__":
    main()