validate_atoms.py
7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
"""
校验 atoms 质量,输出问题明细与汇总报告。
用法:
python3 scripts/validate_atoms.py
python3 scripts/validate_atoms.py build/v4.57.3
"""
from __future__ import annotations
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent
DEFAULT_TARGET = BASE_DIR / "build"
OUTPUT_DIR = BASE_DIR / "dist" / "quality"
SCREENSHOT_MARKERS = ("[截图]", "[图]", "截图")
GENERIC_RESULTS = {
"满足预期",
"搜索出结果",
"成功",
"失败",
"显示成功",
"显示失败",
}
def iter_jsonl_files(target: Path) -> list[Path]:
if target.is_file():
return [target]
patterns = [
"*case_atoms.jsonl",
"*case_atoms_model.jsonl",
"*doc_atoms.jsonl",
"*doc_atoms_model.jsonl",
"*rule_atoms.jsonl",
"*rule_atoms_model.jsonl",
]
files = []
seen = set()
for pattern in patterns:
for path in sorted(target.rglob(pattern)):
if path in seen:
continue
seen.add(path)
files.append(path)
return files
def clean_text(text: str) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
def detect_issues(row: dict) -> list[str]:
issues: list[str] = []
c = clean_text(row.get("C", ""))
a = clean_text(row.get("A", ""))
r = clean_text(row.get("R", ""))
source_type = row.get("atom_type", "")
evidence = row.get("evidence")
canon_text = clean_text(row.get("canon_text", ""))
if not row.get("app_version"):
issues.append("missing_app_version")
if not row.get("primary_module"):
issues.append("missing_primary_module")
if source_type == "definition":
if not canon_text:
issues.append("missing_canon_text")
elif not (canon_text.startswith("TERM=") or canon_text.startswith("C=")):
issues.append("invalid_definition_format")
else:
if not canon_text:
issues.append("missing_canon_text")
if canon_text and not canon_text.startswith("C="):
issues.append("invalid_canon_format")
if not a:
issues.append("missing_action")
if not r:
issues.append("missing_result")
if r in GENERIC_RESULTS:
issues.append("generic_result")
if any(marker in r for marker in SCREENSHOT_MARKERS):
issues.append("screenshot_marker_in_result")
if len(r) > 200:
issues.append("result_too_long")
if source_type == "doc_rule" and len(a) < 4:
issues.append("doc_action_too_short")
if source_type == "doc_rule" and a and re.match(r"^[的了在和与及、,;。]+", a):
issues.append("doc_action_fragmented")
if source_type == "case_rule" and re.search(r"^\d+[,、.]", a):
issues.append("case_action_is_enumeration")
if source_type == "case_rule" and any(marker in canon_text for marker in SCREENSHOT_MARKERS):
issues.append("screenshot_marker_in_canon")
if source_type == "rule" and isinstance(evidence, list):
if not any(item.get("raw_text") for item in evidence if isinstance(item, dict)):
issues.append("rule_missing_raw_evidence")
elif source_type in {"case_rule", "doc_rule"} and not evidence:
issues.append("missing_evidence")
return issues
def main() -> None:
target = Path(sys.argv[1]).resolve() if len(sys.argv) > 1 else DEFAULT_TARGET
files = iter_jsonl_files(target)
if not files:
print("no_atom_files_found")
return
issue_counter: Counter[str] = Counter()
file_counter: dict[str, Counter[str]] = defaultdict(Counter)
issue_rows = []
total_rows = 0
for path in files:
rel_path = str(path.relative_to(BASE_DIR))
with path.open("r", encoding="utf-8") as handle:
for line_no, raw_line in enumerate(handle, start=1):
line = raw_line.strip()
if not line:
continue
total_rows += 1
try:
row = json.loads(line)
except json.JSONDecodeError as exc:
issue_counter["invalid_json"] += 1
file_counter[rel_path]["invalid_json"] += 1
issue_rows.append(
{
"file": rel_path,
"line_no": line_no,
"issue": "invalid_json",
"detail": str(exc),
}
)
continue
row_issues = detect_issues(row)
for issue in row_issues:
issue_counter[issue] += 1
file_counter[rel_path][issue] += 1
issue_rows.append(
{
"file": rel_path,
"line_no": line_no,
"issue": issue,
"app_version": row.get("app_version", ""),
"atom_type": row.get("atom_type", ""),
"primary_module": row.get("primary_module", ""),
"feature_scope": clean_text(row.get("feature_scope", ""))[:120],
"preview": clean_text(row.get("R") or row.get("A") or row.get("canon_text", ""))[:200],
}
)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
issues_path = OUTPUT_DIR / "atom_issues.jsonl"
summary_path = OUTPUT_DIR / "atom_quality_summary.md"
with issues_path.open("w", encoding="utf-8") as handle:
for row in issue_rows:
handle.write(json.dumps(row, ensure_ascii=False) + "\n")
lines = [
"# Atoms 质量报告",
"",
f"- 扫描文件数:{len(files)}",
f"- 扫描行数:{total_rows}",
f"- 问题总数:{sum(issue_counter.values())}",
"",
"## 问题分布",
"",
]
for issue, count in issue_counter.most_common():
lines.append(f"- {issue}: {count}")
lines.extend(["", "## 文件分布", ""])
ranked_files = sorted(file_counter.items(), key=lambda item: (-sum(item[1].values()), item[0]))
for rel_path, counter in ranked_files[:50]:
top_issue = ", ".join(f"{issue}:{count}" for issue, count in counter.most_common(5))
lines.append(f"- {rel_path}: {sum(counter.values())} ({top_issue})")
lines.extend(["", "## 建议", ""])
lines.append("- `invalid_json`、`missing_*`:先修输入数据,再讨论检索效果。")
lines.append("- `screenshot_marker_*`、`case_action_is_enumeration`:说明纯脚本抽取未完成语义清洗,应改为模型蒸馏。")
lines.append("- `doc_action_fragmented`、`doc_action_too_short`:说明 PDF 切块断裂,培训文档必须走模型重组。")
lines.append("")
summary_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"files={len(files)}")
print(f"rows={total_rows}")
print(f"issues={sum(issue_counter.values())}")
print(f"output={OUTPUT_DIR.relative_to(BASE_DIR)}")
if __name__ == "__main__":
main()