build_knowledge_docs.py 8.84 KB
#!/usr/bin/env python3
"""
三源合并 → 模块规则文档(Dify 知识库格式)

输入:build/*/rule_atoms.jsonl + build/*/case_atoms.jsonl
输出:knowledge/{MODULE}_rules.md(每个模块一个文件)

用法:
  python3 scripts/build_knowledge_docs.py          # 生成所有模块
  python3 scripts/build_knowledge_docs.py INCOME   # 只生成指定模块
"""

import json
import re
import sys
from pathlib import Path
from collections import defaultdict
from datetime import datetime

BASE_DIR = Path(__file__).parent.parent
BUILD_DIR = BASE_DIR / "build"
OUTPUT_DIR = BASE_DIR / "knowledge"

# ── 优先级:doc_rule > case_rule > rule/definition ────────────────────
SOURCE_PRIORITY = {"doc_rule": 0, "case_rule": 1, "rule": 2, "definition": 2}

MODULE_NAMES = {
    "AUTH":         "医师认证",
    "INCOME":       "收入提现",
    "INQUIRY":      "问诊咨询",
    "CLINIC":       "门诊排班",
    "PATIENT":      "患者管理",
    "NOTIFICATION": "通知消息",
    "BACKSTAGE":    "运营后台",
}

# ── 每个模块的业务说明(用于文档顶部摘要) ────────────────────────────
MODULE_DESC = {
    "AUTH":    "医师认证流程、证照资质管理、工作室开通/关闭规则、互联网备案。",
    "INCOME":  "医生收入提现、签约第三方(工猫/才燊)、银行卡绑定、结算规则。",
    "INQUIRY": "图文/电话/视频问诊流程、随访、咨询费设置、免费提问规则。",
    "CLINIC":  "门诊预约/挂号/排班/坐诊/加号/处方开具规则。",
    "PATIENT": "就诊人档案、患者列表/分组/拉黑、添加患者、患者消息提示。",
    "NOTIFICATION": "App Push、飞书消息、猫头鹰待办、证件过期提醒等通知规则。",
    "BACKSTAGE": "猫头鹰运营后台审核流程、审核详情页、工作室手动开通/关闭。",
}


def load_all_atoms() -> list[dict]:
    """加载所有版本的 rule_atoms 和 case_atoms"""
    atoms = []
    for jsonl_file in sorted(BUILD_DIR.rglob("*.jsonl")):
        try:
            with open(jsonl_file, "r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if line:
                        atom = json.loads(line)
                        atom["_source_file"] = str(jsonl_file.relative_to(BASE_DIR))
                        atoms.append(atom)
        except Exception as e:
            print(f"  ⚠️  跳过 {jsonl_file}: {e}")
    return atoms


def deduplicate(atoms: list[dict]) -> list[dict]:
    """
    按 merge_fingerprint 去重:
    - 优先保留 atom_type 优先级更高的
    - 同优先级则保留版本更新的
    """
    seen: dict[str, dict] = {}
    for atom in atoms:
        fp = atom.get("merge_fingerprint", "")
        if not fp:
            continue
        if fp not in seen:
            seen[fp] = atom
        else:
            existing = seen[fp]
            new_pri = SOURCE_PRIORITY.get(atom.get("atom_type", "rule"), 2)
            old_pri = SOURCE_PRIORITY.get(existing.get("atom_type", "rule"), 2)
            if new_pri < old_pri:
                seen[fp] = atom  # 更高优先级覆盖
            elif new_pri == old_pri:
                # 同优先级:保留版本号更大的
                if atom.get("app_version", "") > existing.get("app_version", ""):
                    seen[fp] = atom
    return list(seen.values())


def format_evidence(atom: dict) -> str:
    """格式化来源信息"""
    ev = atom.get("evidence", "")
    ver = atom.get("app_version", "")
    atype = atom.get("atom_type", "")

    source_tag = {
        "doc_rule": "📄培训文档",
        "case_rule": "🧪测试用例",
        "rule": "🎨Figma",
        "definition": "🎨Figma",
    }.get(atype, "📌")

    if isinstance(ev, list) and ev:
        # rule_atoms 的 evidence 是数组(含 figma_url)
        ev0 = ev[0]
        url = ev0.get("figma_url", "")
        return f"{source_tag} {ver}" + (f" · [Figma]({url})" if url else "")
    elif isinstance(ev, str):
        # case_atoms 的 evidence 是字符串
        ext_m = re.search(r'externalid:(\d+)', ev)
        ext_id = f" · 用例#{ext_m.group(1)}" if ext_m else ""
        return f"{source_tag} {ver}{ext_id}"
    return f"{source_tag} {ver}"


def atom_to_md_block(atom: dict) -> str:
    """将单条 atom 渲染为 Markdown 段落"""
    atype = atom.get("atom_type", "rule")
    c = atom.get("C", "").strip()
    a = atom.get("A", "").strip()
    r = atom.get("R", "").strip()
    term = atom.get("term", "")
    defn = atom.get("definition", "")
    scope = atom.get("feature_scope", "")
    ev_str = format_evidence(atom)

    lines = []

    if atype == "definition" and term:
        # 术语定义型
        lines.append(f"**【定义】{term}**")
        lines.append(f"{defn}")
    else:
        # CAR 规则型
        if c:
            lines.append(f"**前提**:{c}")
        if a:
            lines.append(f"**操作**:{a}")
        if r:
            lines.append(f"**结果**:{r}")

    lines.append(f"*来源:{ev_str}*")
    return "\n".join(lines)


def group_atoms_by_scope(atoms: list[dict]) -> dict[str, list[dict]]:
    """按 feature_scope 分组,scope 相同的放一起"""
    grouped: dict[str, list[dict]] = defaultdict(list)
    for atom in atoms:
        scope = atom.get("feature_scope", "其他").strip()
        # 清理 scope 前缀(去掉版本号前缀 "4.40.0 > ")
        scope = re.sub(r'^[\d.]+\s*>\s*', '', scope)
        grouped[scope].append(atom)
    return grouped


def build_module_doc(module: str, atoms: list[dict]) -> str:
    """为单个模块生成完整的 Markdown 文档(Dify 知识库格式)"""
    name = MODULE_NAMES.get(module, module)
    desc = MODULE_DESC.get(module, "")
    now = datetime.now().strftime("%Y-%m-%d")

    type_counts: dict = defaultdict(int)
    for a in atoms:
        type_counts[a.get("atom_type", "rule")] += 1

    type_label = {"doc_rule": "培训文档", "case_rule": "测试用例", "rule": "Figma", "definition": "Figma"}
    source_summary = "、".join(
        f"{type_label.get(k, k)} {v}条"
        for k, v in sorted(type_counts.items(), key=lambda x: SOURCE_PRIORITY.get(x[0], 9))
    )

    lines = [
        f"# {module} · {name} 模块规则",
        f"",
        f"> **模块说明**:{desc}",
        f"> **规则总数**:{len(atoms)} 条({source_summary})",
        f"> **最后更新**:{now}",
        f"> **数据来源**:Figma 设计稿 + 测试用例(培训文档待补充)",
        f"> **优先级**:培训文档 > 测试用例 > Figma",
        f"",
        f"---",
        f"",
    ]

    grouped = group_atoms_by_scope(atoms)
    for scope in sorted(grouped.keys()):
        scope_atoms = grouped[scope]
        scope_atoms.sort(key=lambda a: (
            SOURCE_PRIORITY.get(a.get("atom_type", "rule"), 2),
            a.get("app_version", ""),
        ))

        lines.append(f"## {scope}")
        lines.append("")

        for atom in scope_atoms:
            block = atom_to_md_block(atom)
            lines.append(block)
            lines.append("")

        lines.append("---")
        lines.append("")

    return "\n".join(lines)


def main():
    module_filter = sys.argv[1].upper() if len(sys.argv) > 1 else None

    print("📥 加载所有 atoms...")
    all_atoms = load_all_atoms()
    print(f"   原始总量:{len(all_atoms)} 条")

    atoms = deduplicate(all_atoms)
    print(f"   去重后:{len(atoms)} 条")

    # 按模块分组(一条 atom 可能属于多个模块,但只归入 primary_module)
    by_module: dict[str, list[dict]] = defaultdict(list)
    for atom in atoms:
        pm = atom.get("primary_module", "").upper()
        if pm:
            by_module[pm].append(atom)

    # 统计
    print(f"\n模块分布:")
    for mod in sorted(by_module.keys()):
        print(f"  {mod:15s} {len(by_module[mod]):4d} 条")

    OUTPUT_DIR.mkdir(exist_ok=True)

    modules_to_build = [module_filter] if module_filter else sorted(by_module.keys())
    generated = []

    for module in modules_to_build:
        if module not in by_module:
            print(f"\n⚠️  模块 {module} 没有数据")
            continue
        atoms_for_module = by_module[module]
        doc = build_module_doc(module, atoms_for_module)
        out_path = OUTPUT_DIR / f"{module}_rules.md"
        with open(out_path, "w", encoding="utf-8") as f:
            f.write(doc)
        generated.append((module, len(atoms_for_module), out_path))
        print(f"\n✅ {module}_rules.md → {len(atoms_for_module)} 条规则")

    print(f"\n{'='*50}")
    print(f"生成完成:{len(generated)} 个模块文档 → {OUTPUT_DIR}/")
    for mod, cnt, path in generated:
        size_kb = path.stat().st_size / 1024
        print(f"  {mod}_rules.md  ({cnt} 条,{size_kb:.1f}KB)")


if __name__ == "__main__":
    main()