parse_testcase_to_jsonl.py 13.1 KB
#!/usr/bin/env python3
"""
测试用例 XML → case_atoms.jsonl(产品知识库格式)
支持批量模式:扫描 testCase/ 目录下所有 XML,自动识别版本号并输出到对应 build/ 目录

用法:
  python3 scripts/parse_testcase_to_jsonl.py              # 批量处理所有 XML
  python3 scripts/parse_testcase_to_jsonl.py 4.42.0       # 只处理指定版本
  python3 scripts/parse_testcase_to_jsonl.py --force      # 强制覆盖已存在的 case_atoms.jsonl
"""

import xml.etree.ElementTree as ET
import json
import hashlib
import re
import sys
import os
from pathlib import Path
from collections import Counter
from html.parser import HTMLParser

BASE_DIR = Path(__file__).parent.parent
TESTCASE_DIR = BASE_DIR / "testCase"
BUILD_DIR = BASE_DIR / "build"

# ── 模块映射(关键词 → 模块) ──────────────────────────────────────────
MODULE_KEYWORDS = {
    "AUTH": ["认证", "证照", "身份证", "执业", "资质", "卫健委", "人脸", "医师类别",
             "助理辅助", "认证流程", "工作室开通", "开通工作室", "电子签名", "签名",
             "证件", "备案", "互联网医院", "医师分类", "合规医"],
    "INCOME": ["提现", "签约", "工猫", "才燊", "银川", "发放", "结算",
               "第三方", "提现公司", "税源", "银行卡", "余额", "绩效"],
    "INQUIRY": ["问诊", "咨询单", "会话", "主诉", "咨询费"],
    "CLINIC": ["开方", "处方", "坐诊", "预约", "挂号", "加号", "门诊", "排班"],
    "PATIENT": ["患者", "就诊人", "档案"],
    "NOTIFICATION": ["通知", "飞书", "消息", "待办", "push"],
    "BACKSTAGE": ["猫头鹰", "审核", "客服", "运营", "后台"],
}

TOUCHPOINT_KEYWORDS = {
    "医生App": ["app首页", "首页头像", "app认证", "认证流程", "个人信息页", "我的tab"],
    "认证页面": ["认证流程", "个人信息提交", "选择身份", "证照资料", "助理辅助认证"],
    "提现页": ["提现", "签约"],
    "猫头鹰后台": ["猫头鹰", "审核详情", "认证详情", "认证查询", "医生详情"],
    "开方页面": ["开方", "处方"],
    "患者端": ["患者端"],
    "飞书通知": ["飞书", "消息通知", "飞书消息"],
}

DEPRECATED_KEYWORDS = ["作废", "这条作废", "没有这种场景"]
SCREENSHOT_PLACEHOLDERS = {"[截图]", "截图", "[图]", "[截图] [截图]"}


# ── HTML 工具 ──────────────────────────────────────────────────────────
class HTMLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self.text_parts = []

    def handle_data(self, data):
        s = data.strip()
        if s:
            self.text_parts.append(s)

    def get_text(self):
        return " ".join(self.text_parts).strip()


def strip_html(html_str: str) -> str:
    if not html_str:
        return ""
    html_str = html_str.replace(" ", " ").replace(" ", " ")
    html_str = html_str.replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">")
    parser = HTMLStripper()
    parser.feed(html_str)
    text = parser.get_text()
    return re.sub(r"\s+", " ", text).strip()


# ── 预处理:剥除 base64 ────────────────────────────────────────────────
def preprocess_xml(content: str) -> tuple[str, int, int]:
    orig_len = len(content)
    # 剥除 <img src="data:image/...;base64,...">
    content = re.sub(
        r'<img\b[^>]*\bsrc=["\']data:image/[^;]+;base64,[A-Za-z0-9+/=\s]+["\'][^>]*>',
        '[截图]', content, flags=re.IGNORECASE | re.DOTALL,
    )
    # 剥除遗留孤立 base64 块
    content = re.sub(
        r'data:image/[^;]+;base64,[A-Za-z0-9+/=\r\n]{50,}',
        '[截图]', content, flags=re.IGNORECASE,
    )
    # 修复裸 & 符号
    content = re.sub(r'&(?!amp;|lt;|gt;|quot;|apos;|#)', '&amp;', content)
    return content, orig_len, len(content)


# ── 核心逻辑 ──────────────────────────────────────────────────────────
def fingerprint(canon: str) -> str:
    return hashlib.sha1(canon.encode("utf-8")).hexdigest()[:12]


def normalize(text: str) -> str:
    return re.sub(r"\s+", " ", text).strip()


def infer_modules(text: str) -> tuple[str, list[str]]:
    matched = {mod: sum(1 for kw in kws if kw in text)
               for mod, kws in MODULE_KEYWORDS.items()}
    matched = {k: v for k, v in matched.items() if v > 0}
    if not matched:
        return "AUTH", ["AUTH"]
    sorted_mods = sorted(matched.items(), key=lambda x: -x[1])
    return sorted_mods[0][0], [m for m, _ in sorted_mods]


def infer_touchpoints(text: str) -> list[str]:
    result = [tp for tp, kws in TOUCHPOINT_KEYWORDS.items() if any(kw in text for kw in kws)]
    return result if result else ["医生App"]


def get_cdata(elem, tag) -> str:
    child = elem.find(tag)
    return child.text.strip() if child is not None and child.text else ""


def build_atoms(internal_id, external_id, case_name, suite_path,
                preconditions, steps, app_version) -> list[dict]:
    suite_name = suite_path[-1] if suite_path else ""
    feature_scope = " > ".join(suite_path[-3:]) if suite_path else ""
    all_text = " ".join([case_name, suite_name, preconditions] +
                        [s.get("action", "") + " " + s.get("expected", "") for s in steps])

    if any(kw in (case_name + all_text) for kw in DEPRECATED_KEYWORDS):
        return []

    primary_module, modules = infer_modules(all_text)
    touchpoints = infer_touchpoints(all_text)

    def make(c: str, a: str, r: str):
        c, a, r = normalize(c), normalize(a), normalize(r)
        if not r or r in SCREENSHOT_PLACEHOLDERS:
            return None
        if not a:
            return None
        canon = f"C={c}|A={a}|R={r}"
        fp = fingerprint(canon)
        search_terms = list(dict.fromkeys(
            w for part in [c, a, r, case_name]
            for w in re.findall(r"[\u4e00-\u9fff]{2,6}", part)
        ))[:8]
        evidence_parts = [f"suite:{'/'.join(suite_path)}", f"case:{case_name}"]
        if external_id:
            evidence_parts.append(f"externalid:{external_id}")
        return {
            "atom_id": f"{app_version}_{fp}",
            "app_version": app_version,
            "atom_type": "case_rule",
            "C": c, "A": a, "R": r,
            "primary_module": primary_module,
            "modules": modules,
            "feature_scope": feature_scope,
            "touchpoints": touchpoints,
            "canon_text": canon,
            "merge_fingerprint": fp,
            "evidence": f"TC-{app_version} {' '.join(evidence_parts)}",
            "suite_path": suite_path,
            "case_name": case_name,
            "internal_id": internal_id,
            "external_id": external_id,
            "search_terms": search_terms,
        }

    atoms = []
    if steps:
        base_c = preconditions or ""
        for step in steps:
            action = step.get("action", "").strip()
            expected = step.get("expected", "").strip()
            if not action and not expected:
                continue
            effective_a = action if action else case_name
            effective_c = base_c if base_c else (case_name if action else "")
            atom = make(effective_c, effective_a, expected)
            if atom:
                atoms.append(atom)
    else:
        atom = make("", case_name, "满足预期")
        if atom:
            atoms.append(atom)
    return atoms


def parse_suite(suite_elem, suite_path: list[str], all_atoms: list, app_version: str):
    suite_name = suite_elem.get("name", "").strip()
    current_path = suite_path + [suite_name] if suite_name else suite_path

    for child_suite in suite_elem.findall("testsuite"):
        parse_suite(child_suite, current_path, all_atoms, app_version)

    for tc in suite_elem.findall("testcase"):
        internal_id = tc.get("internalid", "")
        case_name = tc.get("name", "").strip()
        external_id = get_cdata(tc, "externalid")
        version = get_cdata(tc, "version")
        preconditions = strip_html(get_cdata(tc, "preconditions"))
        summary = strip_html(get_cdata(tc, "summary"))
        pre_combined = " ".join(filter(None, [preconditions, summary])).strip()

        steps = []
        steps_elem = tc.find("steps")
        if steps_elem is not None:
            for step in steps_elem.findall("step"):
                action = strip_html(get_cdata(step, "actions"))
                expected = strip_html(get_cdata(step, "expectedresults"))
                if action or expected:
                    steps.append({"action": action, "expected": expected})

        all_atoms.extend(build_atoms(
            internal_id, external_id, case_name,
            current_path, pre_combined, steps, app_version
        ))


def deduplicate(atoms: list[dict]) -> list[dict]:
    seen = set()
    result = []
    for atom in atoms:
        fp = atom["merge_fingerprint"]
        if fp not in seen:
            seen.add(fp)
            result.append(atom)
    return result


def extract_version(filename: str):
    """从文件名提取版本号,如 4.42.0 → v4.42.0"""
    m = re.search(r'(\d+\.\d+(?:\.\d+)*)', filename)
    return f"v{m.group(1)}" if m else None


# ── 处理单个文件 ───────────────────────────────────────────────────────
def process_file(xml_path: Path, force: bool = False) -> dict:
    app_version = extract_version(xml_path.name)
    if not app_version:
        return {"file": xml_path.name, "status": "skip", "reason": "无法识别版本号"}

    output_dir = BUILD_DIR / app_version
    output_path = output_dir / "case_atoms.jsonl"

    if output_path.exists() and not force:
        return {"file": xml_path.name, "status": "skip", "reason": "已存在,使用 --force 覆盖"}

    output_dir.mkdir(parents=True, exist_ok=True)

    with open(xml_path, "r", encoding="utf-8") as f:
        content = f.read()

    content, orig_len, cleaned_len = preprocess_xml(content)
    saved = orig_len - cleaned_len

    try:
        root = ET.fromstring(content)
    except ET.ParseError as e:
        return {"file": xml_path.name, "status": "error", "reason": str(e)}

    all_atoms = []
    parse_suite(root, [], all_atoms, app_version)
    atoms = deduplicate(all_atoms)

    with open(output_path, "w", encoding="utf-8") as f:
        for atom in atoms:
            f.write(json.dumps(atom, ensure_ascii=False) + "\n")

    module_dist = Counter(a["primary_module"] for a in atoms)
    return {
        "file": xml_path.name,
        "status": "ok",
        "version": app_version,
        "output": str(output_path.relative_to(BASE_DIR)),
        "orig_bytes": orig_len,
        "cleaned_bytes": cleaned_len,
        "saved_bytes": saved,
        "atoms": len(atoms),
        "modules": dict(module_dist.most_common()),
    }


# ── 入口 ───────────────────────────────────────────────────────────────
def main():
    args = sys.argv[1:]
    force = "--force" in args
    version_filter = next((a for a in args if re.match(r'\d+\.\d+', a)), None)

    xml_files = sorted(TESTCASE_DIR.glob("*.xml"))
    if not xml_files:
        print("testCase/ 目录下没有找到 XML 文件")
        return

    if version_filter:
        xml_files = [f for f in xml_files if version_filter in f.name]
        if not xml_files:
            print(f"没有找到版本 {version_filter} 对应的 XML 文件")
            return

    print(f"找到 {len(xml_files)} 个 XML 文件,开始处理...\n")

    results = []
    for xml_path in xml_files:
        print(f"▶ {xml_path.name}")
        result = process_file(xml_path, force=force)
        results.append(result)

        if result["status"] == "ok":
            saved_mb = result["saved_bytes"] / 1024 / 1024
            print(f"  ✅ {result['version']} → {result['atoms']} atoms"
                  f"  (清洗节省 {saved_mb:.1f}MB)")
            mods = "  ".join(f"{m}:{c}" for m, c in result["modules"].items())
            print(f"  模块: {mods}")
        elif result["status"] == "skip":
            print(f"  ⏭  跳过: {result['reason']}")
        else:
            print(f"  ❌ 错误: {result['reason']}")
        print()

    # 汇总
    ok = [r for r in results if r["status"] == "ok"]
    skip = [r for r in results if r["status"] == "skip"]
    err = [r for r in results if r["status"] == "error"]
    print("=" * 50)
    print(f"完成: {len(ok)} 个  |  跳过: {len(skip)} 个  |  错误: {len(err)} 个")
    if ok:
        total_atoms = sum(r["atoms"] for r in ok)
        total_saved = sum(r["saved_bytes"] for r in ok) / 1024 / 1024
        print(f"总计: {total_atoms} atoms  |  共节省 {total_saved:.1f}MB base64 数据")


if __name__ == "__main__":
    main()