normalize_model_atoms.py 4.61 KB
#!/usr/bin/env python3
"""
将模型输出的 atoms 规范化后写回 build/<version>/。

用途:
- 补 atom_id / canon_text / merge_fingerprint
- 统一 qa_status / confidence / source_type
- 过滤明显不合规的 atom

用法:
  python3 scripts/normalize_model_atoms.py input.jsonl case
  python3 scripts/normalize_model_atoms.py input.jsonl doc
"""

from __future__ import annotations

import hashlib
import json
import re
import sys
from pathlib import Path


BASE_DIR = Path(__file__).parent.parent
BUILD_DIR = BASE_DIR / "build"

ALLOWED_QA_STATUS = {"draft", "validated", "rejected"}
TYPE_MAP = {
    "case": ("case_rule", "testcase", "case_atoms.model.jsonl"),
    "doc": ("doc_rule", "document", "doc_atoms.model.jsonl"),
    "rule": ("rule", "figma", "rule_atoms.model.jsonl"),
}


def clean_text(text: str) -> str:
    return re.sub(r"\s+", " ", str(text or "")).strip()


def fingerprint(text: str) -> str:
    return hashlib.sha1(text.encode("utf-8")).hexdigest()[:12]


def build_canon_text(row: dict) -> str:
    if row.get("canon_text"):
        return clean_text(row["canon_text"])
    c = clean_text(row.get("C", ""))
    a = clean_text(row.get("A", ""))
    r = clean_text(row.get("R", ""))
    if a or r or c:
        return f"C={c}|A={a}|R={r}"
    return ""


def normalize_row(row: dict, kind: str) -> dict | None:
    if kind not in TYPE_MAP:
        raise ValueError(f"unsupported kind: {kind}")
    atom_type, source_type, _ = TYPE_MAP[kind]

    app_version = clean_text(row.get("app_version", ""))
    if not app_version:
        return None

    normalized = dict(row)
    normalized["app_version"] = app_version if app_version.startswith("v") else f"v{app_version}"
    normalized["atom_type"] = clean_text(row.get("atom_type") or atom_type)
    normalized["source_type"] = clean_text(row.get("source_type") or source_type)
    normalized["primary_module"] = clean_text(row.get("primary_module", "")).upper()
    normalized["modules"] = [clean_text(item).upper() for item in row.get("modules", []) if clean_text(item)]
    if normalized["primary_module"] and normalized["primary_module"] not in normalized["modules"]:
        normalized["modules"].insert(0, normalized["primary_module"])
    normalized["feature_scope"] = clean_text(row.get("feature_scope", ""))
    normalized["touchpoints"] = [clean_text(item) for item in row.get("touchpoints", []) if clean_text(item)]
    normalized["C"] = clean_text(row.get("C", ""))
    normalized["A"] = clean_text(row.get("A", ""))
    normalized["R"] = clean_text(row.get("R", ""))
    normalized["canon_text"] = build_canon_text(normalized)
    normalized["merge_fingerprint"] = clean_text(row.get("merge_fingerprint") or fingerprint(normalized["canon_text"]))
    normalized["atom_id"] = clean_text(
        row.get("atom_id") or f"{normalized['app_version']}_{normalized['merge_fingerprint']}"
    )

    confidence = row.get("confidence", 0.0)
    try:
        confidence = float(confidence)
    except (TypeError, ValueError):
        confidence = 0.0
    normalized["confidence"] = max(0.0, min(1.0, confidence))

    qa_status = clean_text(row.get("qa_status", "draft")) or "draft"
    normalized["qa_status"] = qa_status if qa_status in ALLOWED_QA_STATUS else "draft"

    if normalized["atom_type"] != "definition" and not normalized["canon_text"]:
        return None
    if normalized["atom_type"] != "definition" and (not normalized["A"] or not normalized["R"]):
        return None
    return normalized


def main() -> None:
    if len(sys.argv) != 3:
        print("usage: python3 scripts/normalize_model_atoms.py <input.jsonl> <case|doc|rule>")
        sys.exit(1)

    input_path = Path(sys.argv[1]).resolve()
    kind = sys.argv[2].strip()
    _, _, filename = TYPE_MAP[kind]

    rows = []
    with input_path.open("r", encoding="utf-8") as handle:
        for raw_line in handle:
            line = raw_line.strip()
            if not line:
                continue
            row = json.loads(line)
            normalized = normalize_row(row, kind)
            if normalized:
                rows.append(normalized)

    versions = sorted({row["app_version"] for row in rows})
    for version in versions:
        out_dir = BUILD_DIR / version
        out_dir.mkdir(parents=True, exist_ok=True)
        out_path = out_dir / filename
        with out_path.open("w", encoding="utf-8") as handle:
            for row in rows:
                if row["app_version"] == version:
                    handle.write(json.dumps(row, ensure_ascii=False) + "\n")
        print(f"{version} output={out_path.relative_to(BASE_DIR)} rows={sum(1 for row in rows if row['app_version']==version)}")


if __name__ == "__main__":
    main()