build_usable_knowledge_pack.py 15.5 KB
#!/usr/bin/env python3
"""
生成一套更适合直接导入和使用的知识库包。

输入:
  dist/rag/master_atoms.jsonl
  dist/backend_code/code_atoms.jsonl

输出:
  dist/usable_kb/
"""

from __future__ import annotations

import json
import re
from collections import Counter, defaultdict
from pathlib import Path


BASE_DIR = Path(__file__).parent.parent
RAG_DIR = BASE_DIR / "dist" / "rag"
BACKEND_DIR = BASE_DIR / "dist" / "backend_code"
OUT_DIR = BASE_DIR / "dist" / "usable_kb"

MODULE_ORDER = [
    "AUTH",
    "INCOME",
    "INQUIRY",
    "CLINIC",
    "PATIENT",
    "NOTIFICATION",
    "BACKSTAGE",
    "GENERAL",
]
MODULE_NAMES = {
    "AUTH": "认证",
    "INCOME": "收入提现",
    "INQUIRY": "问诊",
    "CLINIC": "门诊",
    "PATIENT": "患者",
    "NOTIFICATION": "通知",
    "BACKSTAGE": "后台",
    "GENERAL": "通用",
}


def clean_text(text: str) -> str:
    return re.sub(r"\s+", " ", str(text or "")).strip()


def version_key(version: str) -> tuple[int, ...]:
    nums = re.findall(r"\d+", version or "")
    return tuple(int(n) for n in nums) if nums else (0,)


def load_jsonl(path: Path) -> list[dict]:
    if not path.exists():
        return []
    rows = []
    with path.open("r", encoding="utf-8") as handle:
        for raw in handle:
            line = raw.strip()
            if line:
                rows.append(json.loads(line))
    return rows


def display_feature_scope(feature_scope: str) -> str:
    scope = clean_text(feature_scope)
    scope = re.sub(r"^\d{1,2}\.\d+(?=\s|[^\d])\s*", "", scope)
    scope = re.sub(r"^(?:\d+(?:[..]\d+)*[、..)]\s*)+", "", scope)
    scope = re.sub(r"^[•◦■\-]+\s*", "", scope)
    scope = re.sub(r"^[::、..\s]+", "", scope)
    scope = re.split(r"\s*(?:场景|功能设计|需求背景|背景|处理方式|设计说明|说明)[::]", scope, maxsplit=1)[0]
    scope = re.split(r"\s{2,}", scope, maxsplit=1)[0]
    scope = re.sub(r"[,。,;;::]\s*$", "", scope)
    return clean_text(scope) or "未归类功能"


def normalize_rule(text: str) -> str:
    text = clean_text(text)
    text = re.sub(r"^[a-zA-ZivxIVX]+[.、)]\s*", "", text)
    text = re.sub(r"^\d+[..、,)]\s*", "", text)
    text = re.sub(r"^\d+\s+", "", text)
    text = re.sub(r"^[•◦■\-]+\s*", "", text)
    text = text.strip("::;;")
    return clean_text(text)


def choose_title(feature: str, atoms: list[dict]) -> str:
    candidates = [display_feature_scope(feature)]
    for atom in atoms:
        for raw in (atom.get("A", ""), atom.get("C", "")):
            value = display_feature_scope(raw)
            if value and value != "未归类功能":
                candidates.append(value)
    filtered = []
    seen = set()
    for item in candidates:
        if not item or item in seen:
            continue
        seen.add(item)
        filtered.append(item)
    filtered.sort(key=lambda x: (x == "未归类功能", len(x)))
    return filtered[0] if filtered else "未归类功能"


def sample_product_rules(atoms: list[dict], limit: int = 3) -> list[str]:
    seen = set()
    rules = []
    for atom in atoms:
        for raw in (atom.get("R", ""), atom.get("A", ""), atom.get("canon_text", "")):
            text = normalize_rule(raw)
            if not text or len(text) < 6 or text in seen:
                continue
            if any(k in text for k in ["灰度", "仅供参考", "预估时间"]):
                continue
            seen.add(text)
            rules.append(text)
            break
        if len(rules) >= limit:
            break
    return rules


def group_product_features(master_atoms: list[dict]) -> dict[str, dict]:
    grouped: dict[str, dict] = {}
    by_feature: dict[str, list[dict]] = defaultdict(list)
    for atom in master_atoms:
        if atom.get("atom_type") not in {"doc_rule", "definition", "rule", "case_rule"}:
            continue
        by_feature[atom.get("feature_scope", "未归类功能")].append(atom)

    for feature, atoms in by_feature.items():
        modules = sorted({m for atom in atoms for m in atom.get("modules", []) if m})
        primary = [a for a in atoms if a.get("atom_type") in {"doc_rule", "definition"}]
        supplement = [a for a in atoms if a.get("atom_type") in {"rule", "case_rule"}]
        versions = sorted({a.get("app_version", "") for a in atoms if a.get("app_version")}, key=version_key)
        touchpoints = sorted({tp for atom in atoms for tp in atom.get("touchpoints", []) if tp})
        grouped[feature] = {
            "title": choose_title(feature, atoms),
            "feature": feature,
            "modules": modules or ["GENERAL"],
            "versions": versions,
            "touchpoints": touchpoints,
            "primary": primary,
            "supplement": supplement,
        }
    return grouped


def group_code_by_module(code_atoms: list[dict]) -> dict[str, dict[str, list[dict]]]:
    grouped: dict[str, dict[str, list[dict]]] = defaultdict(lambda: {"api": [], "enum": [], "constraint": []})
    for atom in code_atoms:
        module = atom.get("primary_module", "GENERAL")
        atom_type = atom.get("atom_type")
        if atom_type == "api_contract":
            grouped[module]["api"].append(atom)
        elif atom_type == "enum_definition":
            grouped[module]["enum"].append(atom)
        elif atom_type == "impl_constraint":
            grouped[module]["constraint"].append(atom)
    return grouped


def feature_rank(item: dict) -> tuple:
    has_primary = 1 if item["primary"] else 0
    has_supp = 1 if item["supplement"] else 0
    return (-has_primary, -(has_primary + has_supp), -len(item["touchpoints"]), -len(item["versions"]), item["title"].lower())


def render_manifest(product_features: dict[str, dict], code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    counter = Counter()
    for item in product_features.values():
        for module in item["modules"]:
            counter[module] += 1
    lines = [
        "# 可用知识库导入说明",
        "",
        "这套知识库面向三个直接目标:",
        "- 产品逻辑问答",
        "- 版本变更追溯",
        "- 新需求预评审",
        "",
        "推荐导入顺序:",
        "1. `00_导入说明.md`",
        "2. `01_知识库使用规则.md`",
        "3. `02_版本变更总览.md`",
        "4. `03_需求预评审执行指南.md`",
        "5. `04_后台实现导读.md`",
    ]
    for _, idx_module in enumerate(zip(range(10, 18), MODULE_ORDER), start=6):
        idx, module = idx_module
        lines.append(f"- `{idx}_{module}_{MODULE_NAMES[module]}.md`")
    lines.extend(
        [
            "",
            f"- 产品主题数:{len(product_features)}",
            f"- 后台实现原子数:{sum(len(v['api']) + len(v['enum']) + len(v['constraint']) for v in code_by_module.values())}",
            "",
            "## 模块覆盖",
            "",
        ]
    )
    for module in MODULE_ORDER:
        lines.append(f"- {module} / {MODULE_NAMES[module]}:{counter.get(module, 0)} 个主题")
    return "\n".join(lines) + "\n"


def render_rules() -> str:
    return "\n".join(
        [
            "# 知识库使用规则",
            "",
            "## 事实源优先级",
            "",
            "- 培训文档:产品主事实源。",
            "- Figma:交互与页面表现补充源。",
            "- 测试用例:边界、异常、回归行为补充源。",
            "- 后台代码:实现补充源,只补接口、状态、枚举和实现边界,不抢产品定义权。",
            "",
            "## 推荐问法",
            "",
            "- 问产品逻辑:优先看各模块文件中的“产品主事实”。",
            "- 问版本变更:优先看“版本变更总览”,再回到对应模块文件。",
            "- 做需求预评审:优先看“需求预评审执行指南”,再看模块文件中的“实现约束与接口线索”。",
            "",
            "## 使用原则",
            "",
            "- 模块是辅助索引,不是唯一组织轴。",
            "- 同一主题跨多模块时,以业务场景优先,不强行单模块归属。",
            "- 培训文档内容完整保留;不漂亮的历史内容不删除,只尽量不放在主展示位。",
        ]
    ) + "\n"


def render_versions(product_features: dict[str, dict]) -> str:
    lines = [
        "# 版本变更总览",
        "",
        "按功能主题整理版本出现情况,用于快速追版本演进。",
        "",
    ]
    items = sorted(product_features.values(), key=lambda x: (-len(x["versions"]), x["title"].lower()))
    for item in items[:220]:
        lines.append(f"## {item['title']}")
        lines.append("")
        lines.append(f"- 模块:{', '.join(item['modules'])}")
        lines.append(f"- 版本:{', '.join(item['versions']) or '无'}")
        lines.append(f"- 主事实样例:{';'.join(sample_product_rules(item['primary'], 2)) or '无'}")
        lines.append(f"- 补充样例:{';'.join(sample_product_rules(item['supplement'], 2)) or '无'}")
        lines.append("")
    return "\n".join(lines)


def render_review_guide(code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    lines = [
        "# 需求预评审执行指南",
        "",
        "评审新增需求时,建议按下面顺序检查:",
        "1. 查产品主事实,看当前规则和版本演进。",
        "2. 查交互与测试补充,看页面表现、异常场景、边界条件。",
        "3. 查后台实现补充,看接口、枚举、约束、锁和异常。",
        "",
        "## 模块级后台实现规模",
        "",
    ]
    for module in MODULE_ORDER:
        bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        lines.append(f"### {module} / {MODULE_NAMES[module]}")
        lines.append("")
        lines.append(f"- 接口:{len(bucket['api'])}")
        lines.append(f"- 枚举:{len(bucket['enum'])}")
        lines.append(f"- 约束:{len(bucket['constraint'])}")
        api_samples = []
        for atom in sorted(bucket["api"], key=lambda x: (x.get("route_path", ""), x.get("method_name", "")))[:4]:
            api_samples.append(f"{atom.get('http_method', '')} {atom.get('route_path', '')}".strip())
        enum_samples = [atom.get("feature_scope", "") for atom in sorted(bucket["enum"], key=lambda x: x.get("feature_scope", ""))[:4]]
        constraint_samples = [atom.get("rule_text", "") for atom in sorted(bucket["constraint"], key=lambda x: x.get("rule_text", ""))[:4]]
        if api_samples:
            lines.append(f"- 接口样例:{';'.join(api_samples)}")
        if enum_samples:
            lines.append(f"- 枚举样例:{';'.join(enum_samples)}")
        if constraint_samples:
            lines.append(f"- 约束样例:{';'.join(constraint_samples)}")
        lines.append("")
    return "\n".join(lines)


def render_backend_intro(code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    lines = [
        "# 后台实现导读",
        "",
        "后台代码已经被整理成三类知识:接口契约、枚举状态、实现约束。",
        "这一层适合回答:",
        "- 这个需求可能会影响哪些接口",
        "- 哪些状态或枚举需要改",
        "- 哪些异常、权限、锁或幂等逻辑需要回归",
        "",
    ]
    for module in MODULE_ORDER:
        bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        lines.append(f"## {module} / {MODULE_NAMES[module]}")
        lines.append("")
        lines.append(f"- 接口数量:{len(bucket['api'])}")
        lines.append(f"- 枚举数量:{len(bucket['enum'])}")
        lines.append(f"- 约束数量:{len(bucket['constraint'])}")
        lines.append("")
    lines.extend(
        [
            "详细后台知识见:",
            "- `dist/backend_code/01_接口契约.md`",
            "- `dist/backend_code/02_枚举与状态.md`",
            "- `dist/backend_code/03_实现约束.md`",
            "- `dist/backend_code/05_业务实现主题.md`",
            "",
        ]
    )
    return "\n".join(lines)


def render_module_file(module: str, items: list[dict], code_bucket: dict[str, list[dict]]) -> str:
    lines = [
        f"# {module} / {MODULE_NAMES[module]}",
        "",
        "本文件把该模块的产品规则、交互补充、测试边界和后台实现线索放在一起。",
        "",
        "## 模块实现概览",
        "",
        f"- 产品主题数:{len(items)}",
        f"- 后台接口数:{len(code_bucket['api'])}",
        f"- 后台枚举数:{len(code_bucket['enum'])}",
        f"- 后台约束数:{len(code_bucket['constraint'])}",
        "",
    ]
    api_samples = [f"{a.get('http_method', '')} {a.get('route_path', '')}".strip() for a in sorted(code_bucket["api"], key=lambda x: (x.get("route_path", ""), x.get("method_name", "")))[:6]]
    enum_samples = [a.get("feature_scope", "") for a in sorted(code_bucket["enum"], key=lambda x: x.get("feature_scope", ""))[:6]]
    constraint_samples = [a.get("rule_text", "") for a in sorted(code_bucket["constraint"], key=lambda x: x.get("rule_text", ""))[:6]]
    if api_samples:
        lines.append(f"- 接口样例:{';'.join(api_samples)}")
    if enum_samples:
        lines.append(f"- 枚举样例:{';'.join(enum_samples)}")
    if constraint_samples:
        lines.append(f"- 约束样例:{';'.join(constraint_samples)}")
    lines.extend(["", "## 主题清单", ""])

    for item in sorted(items, key=feature_rank)[:90]:
        lines.append(f"### {item['title']}")
        lines.append("")
        if item["touchpoints"]:
            lines.append(f"- 触点:{', '.join(item['touchpoints'])}")
        if item["versions"]:
            lines.append(f"- 涉及版本:{', '.join(item['versions'])}")
        primary_rules = sample_product_rules(item["primary"], 3)
        supplement_rules = sample_product_rules(item["supplement"], 3)
        lines.append(f"- 产品主事实:{';'.join(primary_rules) or '无'}")
        lines.append(f"- 交互/测试补充:{';'.join(supplement_rules) or '无'}")
        lines.append("")
    return "\n".join(lines)


def main() -> None:
    master_atoms = load_jsonl(RAG_DIR / "master_atoms.jsonl")
    code_atoms = load_jsonl(BACKEND_DIR / "code_atoms.jsonl")
    product_features = group_product_features(master_atoms)
    code_by_module = group_code_by_module(code_atoms)

    OUT_DIR.mkdir(parents=True, exist_ok=True)
    for old in OUT_DIR.glob("*"):
        if old.is_file():
            old.unlink()

    (OUT_DIR / "00_导入说明.md").write_text(render_manifest(product_features, code_by_module), encoding="utf-8")
    (OUT_DIR / "01_知识库使用规则.md").write_text(render_rules(), encoding="utf-8")
    (OUT_DIR / "02_版本变更总览.md").write_text(render_versions(product_features), encoding="utf-8")
    (OUT_DIR / "03_需求预评审执行指南.md").write_text(render_review_guide(code_by_module), encoding="utf-8")
    (OUT_DIR / "04_后台实现导读.md").write_text(render_backend_intro(code_by_module), encoding="utf-8")

    for idx, module in enumerate(MODULE_ORDER, start=10):
        items = [item for item in product_features.values() if module in item["modules"]]
        code_bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        path = OUT_DIR / f"{idx}_{module}_{MODULE_NAMES[module]}.md"
        path.write_text(render_module_file(module, items, code_bucket), encoding="utf-8")

    print(f"product_features={len(product_features)}")
    print(f"code_atoms={len(code_atoms)}")
    print(f"output={OUT_DIR.relative_to(BASE_DIR)}")


if __name__ == "__main__":
    main()