build_synthesized_atoms.py 14.2 KB
#!/usr/bin/env python3
"""
从 case_candidates / doc_segments 批量合成更干净的 atoms。

说明:
- 当前环境不调用外部模型,因此这里使用启发式规则做“准模型蒸馏”
- 输出结果写入 build/<version>/case_atoms_model.jsonl 和 doc_atoms_model.jsonl
- 这些结果会带上 confidence / qa_status,供后续索引优先使用

用法:
  python3 scripts/build_synthesized_atoms.py
  python3 scripts/build_synthesized_atoms.py 4.57.3
"""

from __future__ import annotations

import hashlib
import json
import re
import sys
from collections import defaultdict
from pathlib import Path


BASE_DIR = Path(__file__).parent.parent
BUILD_DIR = BASE_DIR / "build"

MODULE_KEYWORDS = {
    "AUTH": ["认证", "证照", "身份证", "执业", "资质", "卫健委", "人脸", "医师类别", "签名", "备案", "工作室"],
    "INCOME": ["提现", "签约", "工猫", "才燊", "银行卡", "余额", "收入", "绩效", "税", "结算", "优惠券", "赞赏"],
    "INQUIRY": ["问诊", "咨询", "会话", "视频", "电话", "图文", "主诉", "咨询费", "义诊", "随访"],
    "CLINIC": ["门诊", "预约", "挂号", "加号", "坐诊", "排班", "开方", "处方", "购药", "药房", "方案"],
    "PATIENT": ["患者", "就诊人", "档案", "病历", "家庭成员", "关注", "粉丝"],
    "NOTIFICATION": ["通知", "消息", "待办", "推送", "短信", "飞书"],
    "BACKSTAGE": ["猫头鹰", "后台", "审核", "客服", "运营"],
}

TOUCHPOINT_KEYWORDS = {
    "提现页": ["提现"],
    "签约页": ["签约"],
    "医生App": ["医师端", "医生端", "app", "首页", "我的"],
    "患者端": ["患者端"],
    "门诊页": ["门诊", "预约", "挂号", "加号", "坐诊"],
    "问诊页": ["问诊", "咨询", "会话", "视频", "电话"],
    "患者页": ["患者", "就诊人", "档案", "病历"],
    "猫头鹰后台": ["猫头鹰", "后台", "审核"],
}

GENERIC_RESULTS = {"满足预期", "搜索出结果", "成功", "失败", "显示成功", "显示失败", "表现正常"}
SCREENSHOT_MARKERS = ("[截图]", "[图]", "截图")


def clean_text(text: str) -> str:
    text = str(text or "")
    for marker in SCREENSHOT_MARKERS:
        text = text.replace(marker, " ")
    text = re.sub(r"\s+", " ", text).strip(" ;;")
    return text


def fingerprint(text: str) -> str:
    return hashlib.sha1(text.encode("utf-8")).hexdigest()[:12]


def infer_modules(text: str) -> tuple[str, list[str]]:
    scores: dict[str, int] = {}
    for module, keywords in MODULE_KEYWORDS.items():
        score = sum(1 for keyword in keywords if keyword in text)
        if score:
            scores[module] = score
    if not scores:
        return "GENERAL", ["GENERAL"]
    sorted_items = sorted(scores.items(), key=lambda item: (-item[1], item[0]))
    return sorted_items[0][0], [module for module, _ in sorted_items]


def infer_touchpoints(text: str) -> list[str]:
    result = [name for name, keywords in TOUCHPOINT_KEYWORDS.items() if any(keyword in text for keyword in keywords)]
    return result or []


def split_numbered_items(text: str) -> list[str]:
    text = clean_text(text)
    if not text:
        return []
    normalized = re.sub(r"(?<!\d)(\d+)[,、..)]\s*", r"\n\1. ", text)
    parts = [clean_text(part) for part in normalized.split("\n") if clean_text(part)]
    return parts or [text]


def normalize_action(text: str, case_name: str) -> str:
    text = clean_text(text)
    if not text:
        return clean_text(case_name)
    text = re.sub(r"^\d+[..、,)]\s*", "", text)
    if re.match(r"^(大家|搜索|输入|点击|选择|打开|提交|发送|返回)", text):
        return text
    if "搜索" in text and case_name:
        return text
    return text


def candidate_atoms_from_case(candidate: dict) -> list[dict]:
    suite_path = candidate.get("suite_path", [])
    feature_scope = " > ".join(suite_path[-3:]) if suite_path else clean_text(candidate.get("feature_scope", ""))
    case_name = clean_text(candidate.get("case_name", ""))
    preconditions = clean_text(candidate.get("preconditions", ""))
    summary = clean_text(candidate.get("summary", ""))
    context = ";".join(part for part in [preconditions, summary] if part)
    all_text = " ".join([case_name, feature_scope, context] + [clean_text(step.get("action", "")) + " " + clean_text(step.get("expected", "")) for step in candidate.get("steps", [])])
    primary_module, modules = infer_modules(all_text)
    touchpoints = infer_touchpoints(all_text + " " + " ".join(suite_path))

    atoms = []
    for step in candidate.get("steps", []):
        action = normalize_action(step.get("action", ""), case_name)
        expected_items = split_numbered_items(step.get("expected", ""))
        if not action and not expected_items:
            continue
        for result in expected_items:
            result = clean_text(re.sub(r"^\d+[..、,)]\s*", "", result))
            if not result or result in GENERIC_RESULTS or "?" in result or "???" in result:
                continue
            c = context
            a = action or case_name
            r = result
            canon = f"C={c}|A={a}|R={r}"
            fp = fingerprint(canon)
            atoms.append(
                {
                    "atom_id": f"{candidate['app_version']}_{fp}",
                    "app_version": candidate["app_version"],
                    "atom_type": "case_rule",
                    "source_type": "testcase",
                    "primary_module": primary_module,
                    "modules": modules,
                    "feature_scope": feature_scope,
                    "touchpoints": touchpoints,
                    "C": c,
                    "A": a,
                    "R": r,
                    "canon_text": canon,
                    "merge_fingerprint": fp,
                    "confidence": 0.72,
                    "qa_status": "validated",
                    "evidence": candidate.get("evidence", {}),
                    "search_terms": [term for term in re.findall(r"[\u4e00-\u9fffA-Za-z]{2,8}", f"{case_name} {a} {r}")[:10]],
                }
            )
    dedup = {}
    for atom in atoms:
        dedup[atom["merge_fingerprint"]] = atom
    return list(dedup.values())


def parse_doc_title(segment: dict) -> tuple[str, str]:
    title = clean_text(segment.get("title", ""))
    text = clean_text(segment.get("text", ""))
    if title and title != "未识别标题":
        return title, text
    # 尝试从段首提取标题
    match = re.match(r"^\d+[..、]([^背景功能目标]{2,40}?)(?:背景[::]|功能[::]|目标[::]|$)", text)
    if match:
        extracted = clean_text(match.group(1))
        remaining = clean_text(text[match.end(1):])
        return extracted or "未归类功能", remaining
    first_sentence = re.split(r"[;。]", text, maxsplit=1)[0]
    return clean_text(first_sentence[:30]) or "未归类功能", text


def strip_leading_index(text: str) -> str:
    return clean_text(re.sub(r"^\d+[..、]\s*", "", text))


def looks_like_doc_header(text: str) -> bool:
    text = clean_text(text)
    if not text or len(text) < 4:
        return False
    if "背景:" in text or "功能:" in text or "目标:" in text:
        return True
    if len(text) <= 30 and not re.search(r"[;。]", text):
        return True
    return False


def split_lettered_items(text: str) -> list[str]:
    text = clean_text(text)
    if not text:
        return []
    normalized = re.sub(r"([a-zA-Z]|[ivxIVX]+)[.、]\s*", r"\n\1. ", text)
    parts = [clean_text(part) for part in normalized.split("\n") if clean_text(part)]
    return parts or [text]


def extract_background_goal(text: str) -> tuple[str, str, str]:
    background = ""
    action_scope = ""
    body = text
    match_bg = re.search(r"背景[::](.*?)(?=目标[::]|功能[::]|$)", text)
    if match_bg:
        background = clean_text(match_bg.group(1))
    match_goal = re.search(r"(?:目标|功能)[::](.*?)(?=\d+[..、]|$)", text)
    if match_goal:
        action_scope = clean_text(match_goal.group(1))
    if match_bg or match_goal:
        start = 0
        if match_goal:
            start = match_goal.end()
        elif match_bg:
            start = match_bg.end()
        body = clean_text(text[start:])
    return background, action_scope, body


def build_doc_atom(segment: dict, scope: str, background: str, action_scope: str, rule: str) -> dict | None:
    scope = clean_text(scope) or "未归类功能"
    background = clean_text(background)
    action_scope = clean_text(action_scope)
    rule = clean_text(rule)
    if not rule or rule in GENERIC_RESULTS or len(rule) < 4 or "?" in rule or "???" in rule:
        return None
    primary_module, modules = infer_modules(f"{scope} {background} {action_scope} {rule}")
    touchpoints = infer_touchpoints(f"{scope} {background} {action_scope} {rule}")
    c = f"背景:{background}" if background else ""
    a = action_scope or scope
    r = rule
    raw_text = clean_text(segment.get("text", ""))
    if r == c.replace("背景:", "") or r == raw_text:
        # 避免把整段背景原样再写一遍结果
        return None
    if ("背景:" in r or "功能:" in r or "目标:" in r) and len(r) > 40:
        return None
    canon = f"C={c}|A={a}|R={r}"
    fp = fingerprint(canon)
    return {
        "atom_id": f"{segment['app_version']}_{fp}",
        "app_version": segment["app_version"],
        "atom_type": "doc_rule",
        "source_type": "document",
        "primary_module": primary_module,
        "modules": modules,
        "feature_scope": scope,
        "touchpoints": touchpoints,
        "C": c,
        "A": a,
        "R": r,
        "canon_text": canon,
        "merge_fingerprint": fp,
        "confidence": 0.78,
        "qa_status": "validated",
        "evidence": {
            "source_file": segment.get("source_file", ""),
            "page": segment.get("page"),
            "raw_text": clean_text(segment.get("text", ""))[:1200],
        },
        "search_terms": [term for term in re.findall(r"[\u4e00-\u9fffA-Za-z]{2,8}", f"{scope} {r}")[:10]],
    }


def segment_atoms_from_doc(segment: dict, context: dict | None) -> tuple[list[dict], dict | None]:
    title, text = parse_doc_title(segment)
    background, action_scope, body = extract_background_goal(text)
    raw_text = clean_text(segment.get("text", ""))
    scope = title or "未归类功能"

    # 有背景/功能的段落视为新的主功能上下文
    if background or action_scope:
        context = {
            "scope": strip_leading_index(scope),
            "background": background,
            "action_scope": action_scope or strip_leading_index(scope),
        }

    # 单条 numbered segment 且无显式背景/功能时,尽量挂到上一个主功能下
    if context and re.match(r"^\d+[..、]", raw_text) and not (background or action_scope):
        scope = context["scope"]
        background = context.get("background", "")
        action_scope = strip_leading_index(raw_text)
        body = strip_leading_index(raw_text)
    else:
        scope = strip_leading_index(scope)
        action_scope = action_scope or scope

    # 主功能段:优先把正文拆成 letter/number 小条
    source_body = body if body else raw_text
    rules = split_numbered_items(source_body)
    if len(rules) == 1:
        rules = split_lettered_items(source_body)
    if not rules:
        rules = [source_body]

    # 主段只有背景/功能时,生成一条摘要规则,避免把整段原文直接塞进 R
    if (background or action_scope) and (not body or body == raw_text):
        summary_rule = action_scope or scope
        atom = build_doc_atom(segment, scope, background, scope, summary_rule)
        return ([atom] if atom else []), context

    atoms = []
    for rule in rules:
        rule = strip_leading_index(rule)
        atom = build_doc_atom(segment, scope, background, action_scope, rule)
        if atom:
            atoms.append(atom)

    dedup = {}
    for atom in atoms:
        dedup[atom["merge_fingerprint"]] = atom
    return list(dedup.values()), context


def load_jsonl(path: Path) -> list[dict]:
    rows = []
    with path.open("r", encoding="utf-8") as handle:
        for raw_line in handle:
            line = raw_line.strip()
            if line:
                rows.append(json.loads(line))
    return rows


def process_version(version_dir: Path) -> tuple[int, int]:
    case_count = 0
    doc_count = 0

    case_candidates_path = version_dir / "case_candidates.jsonl"
    if case_candidates_path.exists():
        atoms = []
        for candidate in load_jsonl(case_candidates_path):
            atoms.extend(candidate_atoms_from_case(candidate))
        atoms = list({atom["merge_fingerprint"]: atom for atom in atoms}.values())
        out_path = version_dir / "case_atoms_model.jsonl"
        with out_path.open("w", encoding="utf-8") as handle:
            for atom in atoms:
                handle.write(json.dumps(atom, ensure_ascii=False) + "\n")
        case_count = len(atoms)

    doc_segments_path = version_dir / "doc_segments.jsonl"
    if doc_segments_path.exists():
        atoms = []
        context = None
        for segment in load_jsonl(doc_segments_path):
            generated, context = segment_atoms_from_doc(segment, context)
            atoms.extend(generated)
        atoms = list({atom["merge_fingerprint"]: atom for atom in atoms}.values())
        out_path = version_dir / "doc_atoms_model.jsonl"
        with out_path.open("w", encoding="utf-8") as handle:
            for atom in atoms:
                handle.write(json.dumps(atom, ensure_ascii=False) + "\n")
        doc_count = len(atoms)

    return case_count, doc_count


def main() -> None:
    version_filter = next((arg for arg in sys.argv[1:] if re.match(r"\d+\.\d+", arg)), None)
    total_case = 0
    total_doc = 0
    for version_dir in sorted(BUILD_DIR.glob("v*")):
        if version_filter and version_filter not in version_dir.name:
            continue
        case_count, doc_count = process_version(version_dir)
        if case_count or doc_count:
            print(f"{version_dir.name} case_model={case_count} doc_model={doc_count}")
            total_case += case_count
            total_doc += doc_count
    print(f"total_case_model={total_case}")
    print(f"total_doc_model={total_doc}")


if __name__ == "__main__":
    main()