build_usable_knowledge_pack.py 25.7 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
#!/usr/bin/env python3
"""
生成一套更适合直接导入和使用的知识库包。

输入:
  dist/rag/master_atoms.jsonl
  dist/backend_code/code_atoms.jsonl

输出:
  dist/usable_kb/
"""

from __future__ import annotations

import json
import re
from collections import Counter, defaultdict
from pathlib import Path


BASE_DIR = Path(__file__).parent.parent
RAG_DIR = BASE_DIR / "dist" / "rag"
BACKEND_DIR = BASE_DIR / "dist" / "backend_code"
OUT_DIR = BASE_DIR / "dist" / "usable_kb"

MODULE_ORDER = [
    "AUTH",
    "INCOME",
    "INQUIRY",
    "CLINIC",
    "PATIENT",
    "NOTIFICATION",
    "BACKSTAGE",
    "GENERAL",
]
MODULE_NAMES = {
    "AUTH": "认证",
    "INCOME": "收入提现",
    "INQUIRY": "问诊",
    "CLINIC": "门诊",
    "PATIENT": "患者",
    "NOTIFICATION": "通知",
    "BACKSTAGE": "后台",
    "GENERAL": "通用",
}
GENERIC_RESULTS = {"满足预期", "搜索出结果", "成功", "失败", "显示成功", "显示失败", "显示正常", "表现正常", "逻辑同上", "无"}
MODULE_ALIASES = {
    "AUTH": "AUTH",
    "认证": "AUTH",
    "身份认证": "AUTH",
    "医生认证": "AUTH",
    "医师资质": "AUTH",
    "互联网医院备案": "AUTH",
    "用户注册": "AUTH",
    "用户登录": "AUTH",
    "INCOME": "INCOME",
    "收入": "INCOME",
    "收入提现": "INCOME",
    "签约": "INCOME",
    "签约提现": "INCOME",
    "税收": "INCOME",
    "税务": "INCOME",
    "收入税务": "INCOME",
    "缴税": "INCOME",
    "收税方式": "INCOME",
    "税源地": "INCOME",
    "结算": "INCOME",
    "费用结算": "INCOME",
    "绩效收入": "INCOME",
    "工猫": "INCOME",
    "安易发": "INCOME",
    "提现": "INCOME",
    "INQUIRY": "INQUIRY",
    "问诊": "INQUIRY",
    "图文问诊": "INQUIRY",
    "电话问诊": "INQUIRY",
    "视频问诊": "INQUIRY",
    "问诊单": "INQUIRY",
    "问诊定价": "INQUIRY",
    "待接诊": "INQUIRY",
    "聊天": "INQUIRY",
    "消息会话": "INQUIRY",
    "医患聊天": "INQUIRY",
    "CLINIC": "CLINIC",
    "门诊": "CLINIC",
    "预约挂号": "CLINIC",
    "PATIENT": "PATIENT",
    "患者": "PATIENT",
    "患者端": "PATIENT",
    "患者管理": "PATIENT",
    "患者档案": "PATIENT",
    "患者分组": "PATIENT",
    "患者互动": "PATIENT",
    "患者通讯录": "PATIENT",
    "患者搜索": "PATIENT",
    "病历": "PATIENT",
    "随访": "PATIENT",
    "评价": "PATIENT",
    "锦旗": "PATIENT",
    "电子锦旗": "PATIENT",
    "NOTIFICATION": "NOTIFICATION",
    "通知": "NOTIFICATION",
    "BACKSTAGE": "BACKSTAGE",
    "后台": "BACKSTAGE",
    "医生管理": "BACKSTAGE",
    "二维码管理": "BACKSTAGE",
    "工作室设置": "BACKSTAGE",
    "工作室开通": "BACKSTAGE",
    "GENERAL": "GENERAL",
}
GENERIC_FEATURE_SEGMENTS = {
    "功能描述",
    "需求背景",
    "背景",
    "说明",
    "场景",
    "兼容性",
    "新版本",
    "老版本",
    "医师端",
    "患者端",
    "医生App",
    "APP端",
    "小程序端",
    "PC端",
}
BAD_TITLE_KEYWORDS = {"目标", "背景", "说明", "场景", "功能描述", "需求背景", "兼容性"}
BAD_TITLE_STARTS = ("如果", "当", "该", "给", "通知", "有", "无", "进入", "直接", "还是", "已经", "支持", "显示", "不显示")
GENERIC_PREFIX_PATTERNS = (
    "医师端",
    "患者端",
    "医生App",
    "APP端",
    "小程序端",
    "PC端",
    "猫头鹰端",
    "猫头鹰后台",
)


def clean_text(text: str) -> str:
    return re.sub(r"\s+", " ", str(text or "")).strip()


def version_key(version: str) -> tuple[int, ...]:
    nums = re.findall(r"\d+", version or "")
    return tuple(int(n) for n in nums) if nums else (0,)


def load_jsonl(path: Path) -> list[dict]:
    if not path.exists():
        return []
    rows = []
    with path.open("r", encoding="utf-8") as handle:
        for raw in handle:
            line = raw.strip()
            if line:
                rows.append(json.loads(line))
    return rows


def display_feature_scope(feature_scope: str) -> str:
    scope = clean_text(feature_scope)
    scope = re.sub(r"^\d{1,2}\.\d+(?=\s|[^\d])\s*", "", scope)
    scope = re.sub(r"^(?:\d+(?:[..]\d+)*[、..)]\s*)+", "", scope)
    scope = re.sub(r"^[•◦■\-]+\s*", "", scope)
    scope = re.sub(r"^[::、..\s]+", "", scope)
    scope = re.split(r"\s*(?:场景|功能设计|需求背景|背景|处理方式|设计说明|说明)[::]", scope, maxsplit=1)[0]
    scope = re.split(r"\s{2,}", scope, maxsplit=1)[0]
    scope = re.sub(r"[,。,;;::]\s*$", "", scope)
    return clean_text(scope) or "未归类功能"


def normalize_module(value: str) -> str | None:
    text = clean_text(value)
    if not text:
        return None
    upper = text.upper()
    if upper in MODULE_ORDER:
        return upper
    return MODULE_ALIASES.get(text)


def normalize_feature_segments(feature_scope: str) -> list[str]:
    text = clean_text(feature_scope)
    text = re.sub(r"\s*-\s*>\s*", " > ", text)
    text = re.sub(r"\s*>\s*", " > ", text)
    text = re.sub(r"^v?\d+(?:\.\d+)+(?:\s*>\s*)?", "", text, flags=re.I)
    parts = [display_feature_scope(part) for part in re.split(r"\s*>\s*", text) if display_feature_scope(part)]
    cleaned = []
    for part in parts:
        part = re.sub(r"^[❤♥•◦■]+", "", part).strip()
        for prefix in GENERIC_PREFIX_PATTERNS:
            part = re.sub(rf"^{re.escape(prefix)}\s*[--/]\s*", "", part)
        if re.fullmatch(r"v?\d+(?:\.\d+)+", part, flags=re.I):
            continue
        part = re.sub(r"^(?:功能描述|需求背景|背景|说明|场景)[::]\s*", "", part)
        part = clean_text(part)
        if not part:
            continue
        cleaned.append(part)
    return cleaned


def normalize_feature_key(feature_scope: str) -> str:
    parts = normalize_feature_segments(feature_scope)
    if not parts:
        return "未归类功能"
    if len(parts) == 1:
        return parts[0]
    tail = parts[-1]
    prev = parts[-2]
    if re.fullmatch(r"[\d.]+", tail):
        return prev
    if tail in GENERIC_FEATURE_SEGMENTS or len(tail) <= 2:
        return f"{prev} > {tail}"
    if len(prev) >= 18 and len(tail) <= 18:
        return tail
    if prev in GENERIC_FEATURE_SEGMENTS:
        return tail
    if len(tail) <= 12 or len(prev) <= 12:
        return f"{prev} > {tail}"
    return tail


def normalize_title_candidate(text: str) -> str:
    text = normalize_feature_key(text)
    text = re.sub(r"\s*-\s*>\s*", " > ", text)
    for prefix in GENERIC_PREFIX_PATTERNS:
        text = re.sub(rf"^{re.escape(prefix)}\s*[--/]\s*", "", text)
    text = re.sub(r"^(?:目标|背景|说明|场景|功能描述|需求背景)[::]\s*", "", text)
    text = re.sub(r"^[•◦■\-]+\s*", "", text)
    text = clean_text(text)
    return text


def rewrite_title(text: str) -> str:
    text = normalize_title_candidate(text)
    if not text:
        return text
    text = re.sub(r"^操作(?:切换)?", "", text).strip()
    text = re.sub(r"^点击(.+?) > (.+)$", r"\1 > \2", text)
    text = re.sub(r"^点击(.+)$", r"\1", text)
    text = re.sub(r"^去掉涉及到的(.+?)相关$", r"\1", text)
    text = re.sub(r"^去掉[“\"]?(.+?)[”\"]?$", r"\1", text)
    text = re.sub(r"^增加app的(.+)$", r"\1", text, flags=re.I)
    text = re.sub(r"^外治还是走原来的流程$", "外治流程", text)
    text = re.sub(r"^没有选择任何筛选条件$", "筛选条件为空", text)
    text = re.sub(r"^第四周放号数据生成$", "第四周放号", text)
    text = re.sub(r"^设置线下预约挂号时[::]\s*(.+)$", r"线下预约挂号设置", text)
    text = re.sub(r"^“我的-优惠券”.*$", "我的优惠券展示", text)
    text = re.sub(r"^(.+?)还是走原来的流程$", r"\1流程", text)
    text = clean_text(text.strip(" >-"))
    return text


def is_good_title(text: str) -> bool:
    text = rewrite_title(text)
    if not text or text == "未归类功能":
        return False
    if len(text) < 3 or len(text) > 40:
        return False
    if text.startswith(BAD_TITLE_STARTS):
        return False
    if any(text.startswith(f"{prefix}-") or text.startswith(f"{prefix} >") for prefix in GENERIC_PREFIX_PATTERNS):
        return False
    if text in GENERIC_FEATURE_SEGMENTS:
        return False
    if any(keyword in text for keyword in BAD_TITLE_KEYWORDS):
        return False
    return True


def extract_title_fragments(text: str) -> list[str]:
    raw = clean_text(text)
    if not raw:
        return []
    raw = re.sub(r"\s*-\s*>\s*", " > ", raw)
    candidates = [raw]
    if ">" in raw:
        candidates.extend(part.strip() for part in raw.split(">") if part.strip())
    candidates.extend(re.split(r"[;;]", raw))
    enriched = []
    for item in candidates:
        item = clean_text(item)
        if not item:
            continue
        item = re.sub(r"^(?:\d+[.、)]\s*)+", "", item)
        item = re.sub(r"^(?:操作|点击|选择|设置|显示|进入|打开|查看|发送|支持|增加|新增)[::]?\s*", "", item)
        item = re.split(r"[,,。]", item, maxsplit=1)[0]
        item = re.split(r"\s{2,}", item, maxsplit=1)[0]
        item = rewrite_title(item)
        if item and not item.startswith(BAD_TITLE_STARTS):
            enriched.append(item)
    result = []
    seen = set()
    for item in enriched:
        if item in seen:
            continue
        seen.add(item)
        result.append(item)
    return result


def normalize_rule(text: str) -> str:
    text = clean_text(text)
    text = re.sub(r"^[a-zA-ZivxIVX]+[.、)]\s*", "", text)
    text = re.sub(r"^\d+[..、,)]\s*", "", text)
    text = re.sub(r"^\d+\s+", "", text)
    text = re.sub(r"^[•◦■\-]+\s*", "", text)
    text = text.strip("::;;")
    return clean_text(text)


def choose_title(feature: str, atoms: list[dict]) -> str:
    candidates: list[tuple[str, int]] = [
        (rewrite_title(feature), 3),
        (normalize_feature_key(feature), 2),
        (display_feature_scope(feature), 1),
    ]
    for atom in atoms:
        for raw in (atom.get("feature_scope", ""),):
            for value in extract_title_fragments(raw):
                if value and value != "未归类功能":
                    candidates.append((value, 3))
        for raw in (atom.get("C", ""), atom.get("A", ""), atom.get("R", "")):
            for value in extract_title_fragments(raw):
                if value and value != "未归类功能":
                    candidates.append((value, 1))
    filtered: list[tuple[str, int]] = []
    seen = set()
    for item, source_rank in candidates:
        if not item or item in seen:
            continue
        seen.add(item)
        filtered.append((item, source_rank))
    if not filtered:
        return "未归类功能"

    def score(entry: tuple[str, int]) -> tuple[int, int, int, int, str]:
        title, source_rank = entry
        title = rewrite_title(title)
        good = 1 if is_good_title(title) else 0
        path_bonus = 1 if " > " in title and not any(title.startswith(f"{prefix} >") for prefix in GENERIC_PREFIX_PATTERNS) else 0
        ideal_len = -abs(len(title) - 10)
        return (good, source_rank, path_bonus, ideal_len, title)

    filtered.sort(key=score, reverse=True)
    return filtered[0][0]


def sample_product_rules(atoms: list[dict], limit: int = 3) -> list[str]:
    seen = set()
    rules = []
    for atom in atoms:
        for raw in (atom.get("R", ""), atom.get("A", ""), atom.get("canon_text", "")):
            text = normalize_rule(raw)
            if not text or len(text) < 6 or text in seen:
                continue
            if any(k in text for k in ["灰度", "仅供参考", "预估时间"]):
                continue
            seen.add(text)
            rules.append(text)
            break
        if len(rules) >= limit:
            break
    return rules


def collect_rule_entries(atoms: list[dict]) -> list[dict]:
    entries = []
    seen = set()
    for atom in sorted(
        atoms,
        key=lambda x: (
            version_key(x.get("app_version", "")),
            x.get("atom_type", ""),
            x.get("merge_fingerprint", ""),
            x.get("R", ""),
            x.get("A", ""),
        ),
    ):
        for raw in (atom.get("R", ""), atom.get("A", ""), atom.get("canon_text", "")):
            text = normalize_rule(raw)
            if not text or len(text) < 2:
                continue
            if text in GENERIC_RESULTS:
                continue
            key = (
                atom.get("app_version", ""),
                atom.get("atom_type", ""),
                text,
            )
            if key in seen:
                continue
            seen.add(key)
            entries.append(
                {
                    "version": atom.get("app_version", "") or "未知版本",
                    "source": atom.get("atom_type", "") or "unknown",
                    "text": text,
                }
            )
            break
    return entries


def group_product_features(master_atoms: list[dict]) -> dict[str, dict]:
    grouped: dict[str, dict] = {}
    by_feature: dict[str, list[dict]] = defaultdict(list)
    for atom in master_atoms:
        if atom.get("atom_type") not in {"doc_rule", "definition", "rule", "case_rule"}:
            continue
        normalized_feature = normalize_feature_key(atom.get("feature_scope", "未归类功能"))
        by_feature[normalized_feature].append(atom)

    for feature, atoms in by_feature.items():
        modules = sorted(
            {
                normalized
                for atom in atoms
                for normalized in [normalize_module(atom.get("primary_module", ""))]
                if normalized
            }
            | {
                normalized
                for atom in atoms
                for module in atom.get("modules", [])
                for normalized in [normalize_module(module)]
                if normalized
            }
        )
        primary = [a for a in atoms if a.get("atom_type") in {"doc_rule", "definition"}]
        supplement = [a for a in atoms if a.get("atom_type") in {"rule", "case_rule"}]
        versions = sorted({a.get("app_version", "") for a in atoms if a.get("app_version")}, key=version_key)
        touchpoints = sorted({tp for atom in atoms for tp in atom.get("touchpoints", []) if tp})
        grouped[feature] = {
            "title": choose_title(feature, atoms),
            "feature": feature,
            "modules": modules or ["GENERAL"],
            "versions": versions,
            "touchpoints": touchpoints,
            "primary": primary,
            "supplement": supplement,
        }
    return grouped


def group_code_by_module(code_atoms: list[dict]) -> dict[str, dict[str, list[dict]]]:
    grouped: dict[str, dict[str, list[dict]]] = defaultdict(lambda: {"api": [], "enum": [], "constraint": []})
    for atom in code_atoms:
        module = atom.get("primary_module", "GENERAL")
        atom_type = atom.get("atom_type")
        if atom_type == "api_contract":
            grouped[module]["api"].append(atom)
        elif atom_type == "enum_definition":
            grouped[module]["enum"].append(atom)
        elif atom_type == "impl_constraint":
            grouped[module]["constraint"].append(atom)
    return grouped


def feature_rank(item: dict) -> tuple:
    has_primary = 1 if item["primary"] else 0
    has_supp = 1 if item["supplement"] else 0
    return (-has_primary, -(has_primary + has_supp), -len(item["touchpoints"]), -len(item["versions"]), item["title"].lower())


def render_manifest(product_features: dict[str, dict], code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    counter = Counter()
    for item in product_features.values():
        for module in item["modules"]:
            counter[module] += 1
    lines = [
        "# 可用知识库导入说明",
        "",
        "这套知识库面向三个直接目标:",
        "- 产品逻辑问答",
        "- 版本变更追溯",
        "- 新需求预评审",
        "",
        "推荐导入顺序:",
        "1. `00_导入说明.md`",
        "2. `01_知识库使用规则.md`",
        "3. `02_版本变更总览.md`",
        "4. `03_需求预评审执行指南.md`",
        "5. `04_后台实现导读.md`",
    ]
    for _, idx_module in enumerate(zip(range(10, 18), MODULE_ORDER), start=6):
        idx, module = idx_module
        lines.append(f"- `{idx}_{module}_{MODULE_NAMES[module]}.md`")
    lines.extend(
        [
            "",
            f"- 产品主题数:{len(product_features)}",
            f"- 后台实现原子数:{sum(len(v['api']) + len(v['enum']) + len(v['constraint']) for v in code_by_module.values())}",
            "",
            "## 模块覆盖",
            "",
        ]
    )
    for module in MODULE_ORDER:
        lines.append(f"- {module} / {MODULE_NAMES[module]}:{counter.get(module, 0)} 个主题")
    return "\n".join(lines) + "\n"


def render_rules() -> str:
    return "\n".join(
        [
            "# 知识库使用规则",
            "",
            "## 事实源优先级",
            "",
            "- 培训文档:产品主事实源。",
            "- Figma:交互与页面表现补充源。",
            "- 测试用例:边界、异常、回归行为补充源。",
            "- 后台代码:实现补充源,只补接口、状态、枚举和实现边界,不抢产品定义权。",
            "",
            "## 推荐问法",
            "",
            "- 问产品逻辑:优先看各模块文件中的“产品主事实”。",
            "- 问版本变更:优先看“版本变更总览”,再回到对应模块文件。",
            "- 做需求预评审:优先看“需求预评审执行指南”,再看模块文件中的“实现约束与接口线索”。",
            "",
            "## 使用原则",
            "",
            "- 模块是辅助索引,不是唯一组织轴。",
            "- 同一主题跨多模块时,以业务场景优先,不强行单模块归属。",
            "- 培训文档内容完整保留;不漂亮的历史内容不删除,只尽量不放在主展示位。",
        ]
    ) + "\n"


def render_versions(product_features: dict[str, dict]) -> str:
    lines = [
        "# 版本变更总览",
        "",
        "按功能主题整理版本出现情况,用于快速追版本演进。",
        "",
    ]
    items = sorted(product_features.values(), key=lambda x: (-len(x["versions"]), x["title"].lower()))
    for item in items:
        lines.append(f"## {item['title']}")
        lines.append("")
        lines.append(f"- 模块:{', '.join(item['modules'])}")
        lines.append(f"- 版本:{', '.join(item['versions']) or '无'}")
        lines.append(f"- 主事实数:{len(collect_rule_entries(item['primary']))}")
        lines.append(f"- 补充事实数:{len(collect_rule_entries(item['supplement']))}")
        lines.append("")
    return "\n".join(lines)


def render_review_guide(code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    lines = [
        "# 需求预评审执行指南",
        "",
        "评审新增需求时,建议按下面顺序检查:",
        "1. 查产品主事实,看当前规则和版本演进。",
        "2. 查交互与测试补充,看页面表现、异常场景、边界条件。",
        "3. 查后台实现补充,看接口、枚举、约束、锁和异常。",
        "",
        "## 模块级后台实现规模",
        "",
    ]
    for module in MODULE_ORDER:
        bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        lines.append(f"### {module} / {MODULE_NAMES[module]}")
        lines.append("")
        lines.append(f"- 接口:{len(bucket['api'])}")
        lines.append(f"- 枚举:{len(bucket['enum'])}")
        lines.append(f"- 约束:{len(bucket['constraint'])}")
        api_samples = []
        for atom in sorted(bucket["api"], key=lambda x: (x.get("route_path", ""), x.get("method_name", "")))[:4]:
            api_samples.append(f"{atom.get('http_method', '')} {atom.get('route_path', '')}".strip())
        enum_samples = [atom.get("feature_scope", "") for atom in sorted(bucket["enum"], key=lambda x: x.get("feature_scope", ""))[:4]]
        constraint_samples = [atom.get("rule_text", "") for atom in sorted(bucket["constraint"], key=lambda x: x.get("rule_text", ""))[:4]]
        if api_samples:
            lines.append(f"- 接口样例:{';'.join(api_samples)}")
        if enum_samples:
            lines.append(f"- 枚举样例:{';'.join(enum_samples)}")
        if constraint_samples:
            lines.append(f"- 约束样例:{';'.join(constraint_samples)}")
        lines.append("")
    return "\n".join(lines)


def render_backend_intro(code_by_module: dict[str, dict[str, list[dict]]]) -> str:
    lines = [
        "# 后台实现导读",
        "",
        "后台代码已经被整理成三类知识:接口契约、枚举状态、实现约束。",
        "这一层适合回答:",
        "- 这个需求可能会影响哪些接口",
        "- 哪些状态或枚举需要改",
        "- 哪些异常、权限、锁或幂等逻辑需要回归",
        "",
    ]
    for module in MODULE_ORDER:
        bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        lines.append(f"## {module} / {MODULE_NAMES[module]}")
        lines.append("")
        lines.append(f"- 接口数量:{len(bucket['api'])}")
        lines.append(f"- 枚举数量:{len(bucket['enum'])}")
        lines.append(f"- 约束数量:{len(bucket['constraint'])}")
        lines.append("")
    lines.extend(
        [
            "详细后台知识见:",
            "- `dist/backend_code/01_接口契约.md`",
            "- `dist/backend_code/02_枚举与状态.md`",
            "- `dist/backend_code/03_实现约束.md`",
            "- `dist/backend_code/05_业务实现主题.md`",
            "",
        ]
    )
    return "\n".join(lines)


def render_module_file(module: str, items: list[dict], code_bucket: dict[str, list[dict]]) -> str:
    lines = [
        f"# {module} / {MODULE_NAMES[module]}",
        "",
        "本文件把该模块的产品规则、交互补充、测试边界和后台实现线索放在一起。",
        "",
        "## 模块实现概览",
        "",
        f"- 产品主题数:{len(items)}",
        f"- 后台接口数:{len(code_bucket['api'])}",
        f"- 后台枚举数:{len(code_bucket['enum'])}",
        f"- 后台约束数:{len(code_bucket['constraint'])}",
        "",
    ]
    api_samples = [f"{a.get('http_method', '')} {a.get('route_path', '')}".strip() for a in sorted(code_bucket["api"], key=lambda x: (x.get("route_path", ""), x.get("method_name", "")))[:6]]
    enum_samples = [a.get("feature_scope", "") for a in sorted(code_bucket["enum"], key=lambda x: x.get("feature_scope", ""))[:6]]
    constraint_samples = [a.get("rule_text", "") for a in sorted(code_bucket["constraint"], key=lambda x: x.get("rule_text", ""))[:6]]
    if api_samples:
        lines.append(f"- 接口样例:{';'.join(api_samples)}")
    if enum_samples:
        lines.append(f"- 枚举样例:{';'.join(enum_samples)}")
    if constraint_samples:
        lines.append(f"- 约束样例:{';'.join(constraint_samples)}")
    lines.extend(["", "## 主题清单", ""])

    for item in sorted(items, key=feature_rank):
        lines.append(f"### {item['title']}")
        lines.append("")
        if item["touchpoints"]:
            lines.append(f"- 触点:{', '.join(item['touchpoints'])}")
        if item["versions"]:
            lines.append(f"- 涉及版本:{', '.join(item['versions'])}")
        primary_entries = collect_rule_entries(item["primary"])
        supplement_entries = collect_rule_entries(item["supplement"])
        lines.append(f"- 主事实条数:{len(primary_entries)}")
        lines.append(f"- 补充事实条数:{len(supplement_entries)}")
        lines.append("")
        lines.append("#### 产品主事实")
        lines.append("")
        if primary_entries:
            for entry in primary_entries:
                lines.append(f"- [{entry['version']}] {entry['text']}")
        else:
            lines.append("- 无")
        lines.append("")
        lines.append("#### 交互/测试补充")
        lines.append("")
        if supplement_entries:
            for entry in supplement_entries:
                lines.append(f"- [{entry['version']}] {entry['text']}")
        else:
            lines.append("- 无")
        lines.append("")
    return "\n".join(lines)


def main() -> None:
    master_atoms = load_jsonl(RAG_DIR / "master_atoms.jsonl")
    code_atoms = load_jsonl(BACKEND_DIR / "code_atoms.jsonl")
    product_features = group_product_features(master_atoms)
    code_by_module = group_code_by_module(code_atoms)

    OUT_DIR.mkdir(parents=True, exist_ok=True)
    for old in OUT_DIR.glob("*"):
        if old.is_file():
            old.unlink()

    (OUT_DIR / "00_导入说明.md").write_text(render_manifest(product_features, code_by_module), encoding="utf-8")
    (OUT_DIR / "01_知识库使用规则.md").write_text(render_rules(), encoding="utf-8")
    (OUT_DIR / "02_版本变更总览.md").write_text(render_versions(product_features), encoding="utf-8")
    (OUT_DIR / "03_需求预评审执行指南.md").write_text(render_review_guide(code_by_module), encoding="utf-8")
    (OUT_DIR / "04_后台实现导读.md").write_text(render_backend_intro(code_by_module), encoding="utf-8")

    for idx, module in enumerate(MODULE_ORDER, start=10):
        items = [item for item in product_features.values() if module in item["modules"]]
        code_bucket = code_by_module.get(module, {"api": [], "enum": [], "constraint": []})
        path = OUT_DIR / f"{idx}_{module}_{MODULE_NAMES[module]}.md"
        path.write_text(render_module_file(module, items, code_bucket), encoding="utf-8")

    print(f"product_features={len(product_features)}")
    print(f"code_atoms={len(code_atoms)}")
    print(f"output={OUT_DIR.relative_to(BASE_DIR)}")


if __name__ == "__main__":
    main()