normalize_model_atoms.py
4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
"""
将模型输出的 atoms 规范化后写回 build/<version>/。
用途:
- 补 atom_id / canon_text / merge_fingerprint
- 统一 qa_status / confidence / source_type
- 过滤明显不合规的 atom
用法:
python3 scripts/normalize_model_atoms.py input.jsonl case
python3 scripts/normalize_model_atoms.py input.jsonl doc
"""
from __future__ import annotations
import hashlib
import json
import re
import sys
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent
BUILD_DIR = BASE_DIR / "build"
ALLOWED_QA_STATUS = {"draft", "validated", "rejected"}
TYPE_MAP = {
"case": ("case_rule", "testcase", "case_atoms.model.jsonl"),
"doc": ("doc_rule", "document", "doc_atoms.model.jsonl"),
"rule": ("rule", "figma", "rule_atoms.model.jsonl"),
}
def clean_text(text: str) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
def fingerprint(text: str) -> str:
return hashlib.sha1(text.encode("utf-8")).hexdigest()[:12]
def build_canon_text(row: dict) -> str:
if row.get("canon_text"):
return clean_text(row["canon_text"])
c = clean_text(row.get("C", ""))
a = clean_text(row.get("A", ""))
r = clean_text(row.get("R", ""))
if a or r or c:
return f"C={c}|A={a}|R={r}"
return ""
def normalize_row(row: dict, kind: str) -> dict | None:
if kind not in TYPE_MAP:
raise ValueError(f"unsupported kind: {kind}")
atom_type, source_type, _ = TYPE_MAP[kind]
app_version = clean_text(row.get("app_version", ""))
if not app_version:
return None
normalized = dict(row)
normalized["app_version"] = app_version if app_version.startswith("v") else f"v{app_version}"
normalized["atom_type"] = clean_text(row.get("atom_type") or atom_type)
normalized["source_type"] = clean_text(row.get("source_type") or source_type)
normalized["primary_module"] = clean_text(row.get("primary_module", "")).upper()
normalized["modules"] = [clean_text(item).upper() for item in row.get("modules", []) if clean_text(item)]
if normalized["primary_module"] and normalized["primary_module"] not in normalized["modules"]:
normalized["modules"].insert(0, normalized["primary_module"])
normalized["feature_scope"] = clean_text(row.get("feature_scope", ""))
normalized["touchpoints"] = [clean_text(item) for item in row.get("touchpoints", []) if clean_text(item)]
normalized["C"] = clean_text(row.get("C", ""))
normalized["A"] = clean_text(row.get("A", ""))
normalized["R"] = clean_text(row.get("R", ""))
normalized["canon_text"] = build_canon_text(normalized)
normalized["merge_fingerprint"] = clean_text(row.get("merge_fingerprint") or fingerprint(normalized["canon_text"]))
normalized["atom_id"] = clean_text(
row.get("atom_id") or f"{normalized['app_version']}_{normalized['merge_fingerprint']}"
)
confidence = row.get("confidence", 0.0)
try:
confidence = float(confidence)
except (TypeError, ValueError):
confidence = 0.0
normalized["confidence"] = max(0.0, min(1.0, confidence))
qa_status = clean_text(row.get("qa_status", "draft")) or "draft"
normalized["qa_status"] = qa_status if qa_status in ALLOWED_QA_STATUS else "draft"
if normalized["atom_type"] != "definition" and not normalized["canon_text"]:
return None
if normalized["atom_type"] != "definition" and (not normalized["A"] or not normalized["R"]):
return None
return normalized
def main() -> None:
if len(sys.argv) != 3:
print("usage: python3 scripts/normalize_model_atoms.py <input.jsonl> <case|doc|rule>")
sys.exit(1)
input_path = Path(sys.argv[1]).resolve()
kind = sys.argv[2].strip()
_, _, filename = TYPE_MAP[kind]
rows = []
with input_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
row = json.loads(line)
normalized = normalize_row(row, kind)
if normalized:
rows.append(normalized)
versions = sorted({row["app_version"] for row in rows})
for version in versions:
out_dir = BUILD_DIR / version
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / filename
with out_path.open("w", encoding="utf-8") as handle:
for row in rows:
if row["app_version"] == version:
handle.write(json.dumps(row, ensure_ascii=False) + "\n")
print(f"{version} output={out_path.relative_to(BASE_DIR)} rows={sum(1 for row in rows if row['app_version']==version)}")
if __name__ == "__main__":
main()