parse_testcase_to_jsonl.py
13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python3
"""
测试用例 XML → case_atoms.jsonl(产品知识库格式)
支持批量模式:扫描 testCase/ 目录下所有 XML,自动识别版本号并输出到对应 build/ 目录
用法:
python3 scripts/parse_testcase_to_jsonl.py # 批量处理所有 XML
python3 scripts/parse_testcase_to_jsonl.py 4.42.0 # 只处理指定版本
python3 scripts/parse_testcase_to_jsonl.py --force # 强制覆盖已存在的 case_atoms.jsonl
"""
import xml.etree.ElementTree as ET
import json
import hashlib
import re
import sys
import os
from pathlib import Path
from collections import Counter
from html.parser import HTMLParser
BASE_DIR = Path(__file__).parent.parent
TESTCASE_DIR = BASE_DIR / "testCase"
BUILD_DIR = BASE_DIR / "build"
# ── 模块映射(关键词 → 模块) ──────────────────────────────────────────
MODULE_KEYWORDS = {
"AUTH": ["认证", "证照", "身份证", "执业", "资质", "卫健委", "人脸", "医师类别",
"助理辅助", "认证流程", "工作室开通", "开通工作室", "电子签名", "签名",
"证件", "备案", "互联网医院", "医师分类", "合规医"],
"INCOME": ["提现", "签约", "工猫", "才燊", "银川", "发放", "结算",
"第三方", "提现公司", "税源", "银行卡", "余额", "绩效"],
"INQUIRY": ["问诊", "咨询单", "会话", "主诉", "咨询费"],
"CLINIC": ["开方", "处方", "坐诊", "预约", "挂号", "加号", "门诊", "排班"],
"PATIENT": ["患者", "就诊人", "档案"],
"NOTIFICATION": ["通知", "飞书", "消息", "待办", "push"],
"BACKSTAGE": ["猫头鹰", "审核", "客服", "运营", "后台"],
}
TOUCHPOINT_KEYWORDS = {
"医生App": ["app首页", "首页头像", "app认证", "认证流程", "个人信息页", "我的tab"],
"认证页面": ["认证流程", "个人信息提交", "选择身份", "证照资料", "助理辅助认证"],
"提现页": ["提现", "签约"],
"猫头鹰后台": ["猫头鹰", "审核详情", "认证详情", "认证查询", "医生详情"],
"开方页面": ["开方", "处方"],
"患者端": ["患者端"],
"飞书通知": ["飞书", "消息通知", "飞书消息"],
}
DEPRECATED_KEYWORDS = ["作废", "这条作废", "没有这种场景"]
SCREENSHOT_PLACEHOLDERS = {"[截图]", "截图", "[图]", "[截图] [截图]"}
# ── HTML 工具 ──────────────────────────────────────────────────────────
class HTMLStripper(HTMLParser):
def __init__(self):
super().__init__()
self.text_parts = []
def handle_data(self, data):
s = data.strip()
if s:
self.text_parts.append(s)
def get_text(self):
return " ".join(self.text_parts).strip()
def strip_html(html_str: str) -> str:
if not html_str:
return ""
html_str = html_str.replace(" ", " ").replace(" ", " ")
html_str = html_str.replace("&", "&").replace("<", "<").replace(">", ">")
parser = HTMLStripper()
parser.feed(html_str)
text = parser.get_text()
return re.sub(r"\s+", " ", text).strip()
# ── 预处理:剥除 base64 ────────────────────────────────────────────────
def preprocess_xml(content: str) -> tuple[str, int, int]:
orig_len = len(content)
# 剥除 <img src="data:image/...;base64,...">
content = re.sub(
r'<img\b[^>]*\bsrc=["\']data:image/[^;]+;base64,[A-Za-z0-9+/=\s]+["\'][^>]*>',
'[截图]', content, flags=re.IGNORECASE | re.DOTALL,
)
# 剥除遗留孤立 base64 块
content = re.sub(
r'data:image/[^;]+;base64,[A-Za-z0-9+/=\r\n]{50,}',
'[截图]', content, flags=re.IGNORECASE,
)
# 修复裸 & 符号
content = re.sub(r'&(?!amp;|lt;|gt;|quot;|apos;|#)', '&', content)
return content, orig_len, len(content)
# ── 核心逻辑 ──────────────────────────────────────────────────────────
def fingerprint(canon: str) -> str:
return hashlib.sha1(canon.encode("utf-8")).hexdigest()[:12]
def normalize(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def infer_modules(text: str) -> tuple[str, list[str]]:
matched = {mod: sum(1 for kw in kws if kw in text)
for mod, kws in MODULE_KEYWORDS.items()}
matched = {k: v for k, v in matched.items() if v > 0}
if not matched:
return "AUTH", ["AUTH"]
sorted_mods = sorted(matched.items(), key=lambda x: -x[1])
return sorted_mods[0][0], [m for m, _ in sorted_mods]
def infer_touchpoints(text: str) -> list[str]:
result = [tp for tp, kws in TOUCHPOINT_KEYWORDS.items() if any(kw in text for kw in kws)]
return result if result else ["医生App"]
def get_cdata(elem, tag) -> str:
child = elem.find(tag)
return child.text.strip() if child is not None and child.text else ""
def build_atoms(internal_id, external_id, case_name, suite_path,
preconditions, steps, app_version) -> list[dict]:
suite_name = suite_path[-1] if suite_path else ""
feature_scope = " > ".join(suite_path[-3:]) if suite_path else ""
all_text = " ".join([case_name, suite_name, preconditions] +
[s.get("action", "") + " " + s.get("expected", "") for s in steps])
if any(kw in (case_name + all_text) for kw in DEPRECATED_KEYWORDS):
return []
primary_module, modules = infer_modules(all_text)
touchpoints = infer_touchpoints(all_text)
def make(c: str, a: str, r: str):
c, a, r = normalize(c), normalize(a), normalize(r)
if not r or r in SCREENSHOT_PLACEHOLDERS:
return None
if not a:
return None
canon = f"C={c}|A={a}|R={r}"
fp = fingerprint(canon)
search_terms = list(dict.fromkeys(
w for part in [c, a, r, case_name]
for w in re.findall(r"[\u4e00-\u9fff]{2,6}", part)
))[:8]
evidence_parts = [f"suite:{'/'.join(suite_path)}", f"case:{case_name}"]
if external_id:
evidence_parts.append(f"externalid:{external_id}")
return {
"atom_id": f"{app_version}_{fp}",
"app_version": app_version,
"atom_type": "case_rule",
"C": c, "A": a, "R": r,
"primary_module": primary_module,
"modules": modules,
"feature_scope": feature_scope,
"touchpoints": touchpoints,
"canon_text": canon,
"merge_fingerprint": fp,
"evidence": f"TC-{app_version} {' '.join(evidence_parts)}",
"suite_path": suite_path,
"case_name": case_name,
"internal_id": internal_id,
"external_id": external_id,
"search_terms": search_terms,
}
atoms = []
if steps:
base_c = preconditions or ""
for step in steps:
action = step.get("action", "").strip()
expected = step.get("expected", "").strip()
if not action and not expected:
continue
effective_a = action if action else case_name
effective_c = base_c if base_c else (case_name if action else "")
atom = make(effective_c, effective_a, expected)
if atom:
atoms.append(atom)
else:
atom = make("", case_name, "满足预期")
if atom:
atoms.append(atom)
return atoms
def parse_suite(suite_elem, suite_path: list[str], all_atoms: list, app_version: str):
suite_name = suite_elem.get("name", "").strip()
current_path = suite_path + [suite_name] if suite_name else suite_path
for child_suite in suite_elem.findall("testsuite"):
parse_suite(child_suite, current_path, all_atoms, app_version)
for tc in suite_elem.findall("testcase"):
internal_id = tc.get("internalid", "")
case_name = tc.get("name", "").strip()
external_id = get_cdata(tc, "externalid")
version = get_cdata(tc, "version")
preconditions = strip_html(get_cdata(tc, "preconditions"))
summary = strip_html(get_cdata(tc, "summary"))
pre_combined = " ".join(filter(None, [preconditions, summary])).strip()
steps = []
steps_elem = tc.find("steps")
if steps_elem is not None:
for step in steps_elem.findall("step"):
action = strip_html(get_cdata(step, "actions"))
expected = strip_html(get_cdata(step, "expectedresults"))
if action or expected:
steps.append({"action": action, "expected": expected})
all_atoms.extend(build_atoms(
internal_id, external_id, case_name,
current_path, pre_combined, steps, app_version
))
def deduplicate(atoms: list[dict]) -> list[dict]:
seen = set()
result = []
for atom in atoms:
fp = atom["merge_fingerprint"]
if fp not in seen:
seen.add(fp)
result.append(atom)
return result
def extract_version(filename: str):
"""从文件名提取版本号,如 4.42.0 → v4.42.0"""
m = re.search(r'(\d+\.\d+(?:\.\d+)*)', filename)
return f"v{m.group(1)}" if m else None
# ── 处理单个文件 ───────────────────────────────────────────────────────
def process_file(xml_path: Path, force: bool = False) -> dict:
app_version = extract_version(xml_path.name)
if not app_version:
return {"file": xml_path.name, "status": "skip", "reason": "无法识别版本号"}
output_dir = BUILD_DIR / app_version
output_path = output_dir / "case_atoms.jsonl"
if output_path.exists() and not force:
return {"file": xml_path.name, "status": "skip", "reason": "已存在,使用 --force 覆盖"}
output_dir.mkdir(parents=True, exist_ok=True)
with open(xml_path, "r", encoding="utf-8") as f:
content = f.read()
content, orig_len, cleaned_len = preprocess_xml(content)
saved = orig_len - cleaned_len
try:
root = ET.fromstring(content)
except ET.ParseError as e:
return {"file": xml_path.name, "status": "error", "reason": str(e)}
all_atoms = []
parse_suite(root, [], all_atoms, app_version)
atoms = deduplicate(all_atoms)
with open(output_path, "w", encoding="utf-8") as f:
for atom in atoms:
f.write(json.dumps(atom, ensure_ascii=False) + "\n")
module_dist = Counter(a["primary_module"] for a in atoms)
return {
"file": xml_path.name,
"status": "ok",
"version": app_version,
"output": str(output_path.relative_to(BASE_DIR)),
"orig_bytes": orig_len,
"cleaned_bytes": cleaned_len,
"saved_bytes": saved,
"atoms": len(atoms),
"modules": dict(module_dist.most_common()),
}
# ── 入口 ───────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
force = "--force" in args
version_filter = next((a for a in args if re.match(r'\d+\.\d+', a)), None)
xml_files = sorted(TESTCASE_DIR.glob("*.xml"))
if not xml_files:
print("testCase/ 目录下没有找到 XML 文件")
return
if version_filter:
xml_files = [f for f in xml_files if version_filter in f.name]
if not xml_files:
print(f"没有找到版本 {version_filter} 对应的 XML 文件")
return
print(f"找到 {len(xml_files)} 个 XML 文件,开始处理...\n")
results = []
for xml_path in xml_files:
print(f"▶ {xml_path.name}")
result = process_file(xml_path, force=force)
results.append(result)
if result["status"] == "ok":
saved_mb = result["saved_bytes"] / 1024 / 1024
print(f" ✅ {result['version']} → {result['atoms']} atoms"
f" (清洗节省 {saved_mb:.1f}MB)")
mods = " ".join(f"{m}:{c}" for m, c in result["modules"].items())
print(f" 模块: {mods}")
elif result["status"] == "skip":
print(f" ⏭ 跳过: {result['reason']}")
else:
print(f" ❌ 错误: {result['reason']}")
print()
# 汇总
ok = [r for r in results if r["status"] == "ok"]
skip = [r for r in results if r["status"] == "skip"]
err = [r for r in results if r["status"] == "error"]
print("=" * 50)
print(f"完成: {len(ok)} 个 | 跳过: {len(skip)} 个 | 错误: {len(err)} 个")
if ok:
total_atoms = sum(r["atoms"] for r in ok)
total_saved = sum(r["saved_bytes"] for r in ok) / 1024 / 1024
print(f"总计: {total_atoms} atoms | 共节省 {total_saved:.1f}MB base64 数据")
if __name__ == "__main__":
main()