build_rag_assets.py 16.4 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
#!/usr/bin/env python3
"""
构建统一的产品研发 RAG 资产。

目标:
1. 把 build/* 下分散的 atoms 聚合为一份跨版本主索引
2. 生成「功能版本演进」与「需求预评审」知识包
3. 为后续对接 Dify / 飞书 Bot / 代码知识库提供统一底座

用法:
  python3 scripts/build_rag_assets.py
"""

from __future__ import annotations

import json
import re
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable


BASE_DIR = Path(__file__).parent.parent
BUILD_DIR = BASE_DIR / "build"
DIST_DIR = BASE_DIR / "dist" / "rag"
KNOWLEDGE_DIR = DIST_DIR / "knowledge_v2"

MODULE_NAMES = {
    "AUTH": "认证",
    "INCOME": "收入提现",
    "INQUIRY": "问诊",
    "CLINIC": "门诊",
    "PATIENT": "患者",
    "NOTIFICATION": "通知",
    "BACKSTAGE": "后台",
    "GENERAL": "通用",
}

SOURCE_NAMES = {
    "doc_rule": "培训文档",
    "case_rule": "测试用例",
    "rule": "Figma规则",
    "definition": "定义",
}

SOURCE_PRIORITY = {"doc_rule": 0, "case_rule": 1, "rule": 2, "definition": 3}

SCREENSHOT_MARKERS = ("[截图]", "[图]", "截图")


@dataclass
class LoadIssue:
    path: str
    line_no: int
    reason: str


def version_key(version: str) -> tuple[int, ...]:
    parts = re.findall(r"\d+", version or "")
    return tuple(int(part) for part in parts)


def clean_text(text: str) -> str:
    if not text:
        return ""
    text = re.sub(r"\s+", " ", str(text)).strip()
    for marker in SCREENSHOT_MARKERS:
        text = text.replace(marker, "")
    return re.sub(r"\s+", " ", text).strip(" ;;")


def clean_scope(scope: str) -> str:
    scope = clean_text(scope)
    scope = re.sub(r"^v?\d+(?:\.\d+)+\s*>\s*", "", scope)
    return scope or "未归类功能"


def normalize_touchpoint(value: str) -> str:
    value = clean_text(value)
    value = value.replace("tab", "Tab")
    return value


def load_jsonl(path: Path, issues: list[LoadIssue]) -> list[dict]:
    rows = []
    with path.open("r", encoding="utf-8") as handle:
        for line_no, raw_line in enumerate(handle, start=1):
            line = raw_line.strip()
            if not line:
                continue
            try:
                row = json.loads(line)
            except json.JSONDecodeError as exc:
                issues.append(LoadIssue(str(path.relative_to(BASE_DIR)), line_no, str(exc)))
                continue
            row["_source_file"] = str(path.relative_to(BASE_DIR))
            rows.append(row)
    return rows


def iter_atoms() -> tuple[list[dict], list[LoadIssue]]:
    issues: list[LoadIssue] = []
    atoms: list[dict] = []
    patterns = [
        "v*/case_atoms.jsonl",
        "v*/case_atoms_model.jsonl",
        "v*/doc_atoms.jsonl",
        "v*/doc_atoms_model.jsonl",
        "v*/rule_atoms.jsonl",
        "v*/rule_atoms_model.jsonl",
    ]
    seen_paths = set()
    for pattern in patterns:
        for path in sorted(BUILD_DIR.glob(pattern)):
            if path in seen_paths:
                continue
            seen_paths.add(path)
            atoms.extend(load_jsonl(path, issues))
    return atoms, issues


def enrich_atom(atom: dict) -> dict:
    version = atom.get("app_version", "")
    source_type = atom.get("atom_type", "unknown")
    primary_module = (atom.get("primary_module") or "GENERAL").upper()
    modules = [str(item).upper() for item in atom.get("modules", []) if item]
    if primary_module not in modules:
        modules.insert(0, primary_module)

    feature_scope = clean_scope(atom.get("feature_scope", ""))
    touchpoints = [normalize_touchpoint(item) for item in atom.get("touchpoints", []) if normalize_touchpoint(item)]
    search_terms = [clean_text(item) for item in atom.get("search_terms", []) if clean_text(item)]
    c = clean_text(atom.get("C", ""))
    a = clean_text(atom.get("A", ""))
    r = clean_text(atom.get("R", ""))

    atom = dict(atom)
    atom["app_version"] = version
    atom["source_type"] = source_type
    atom["source_name"] = SOURCE_NAMES.get(source_type, source_type)
    atom["primary_module"] = primary_module
    atom["modules"] = modules
    atom["feature_scope"] = feature_scope
    atom["touchpoints"] = touchpoints
    atom["search_terms"] = search_terms
    atom["C"] = c
    atom["A"] = a
    atom["R"] = r
    atom["topic_key"] = f"{primary_module}::{feature_scope}"
    atom["rule_text"] = " | ".join(part for part in [c, a, r] if part)
    atom["source_rank"] = SOURCE_PRIORITY.get(source_type, 9)
    atom["confidence"] = atom.get("confidence")
    atom["qa_status"] = atom.get("qa_status", "unknown")
    return atom


def deduplicate_atoms(atoms: Iterable[dict]) -> list[dict]:
    deduped: dict[tuple[str, str, str], dict] = {}
    for atom in atoms:
        key = (atom.get("app_version", ""), atom.get("merge_fingerprint", ""), atom.get("source_type", ""))
        current = deduped.get(key)
        if current is None:
            deduped[key] = atom
            continue
        qa_rank = {"validated": 0, "unknown": 1, "draft": 2, "rejected": 3}
        old_score = (
            current["source_rank"],
            qa_rank.get(current.get("qa_status", "unknown"), 9),
            -(current.get("confidence") or 0),
            -len(current.get("rule_text", "")),
        )
        new_score = (
            atom["source_rank"],
            qa_rank.get(atom.get("qa_status", "unknown"), 9),
            -(atom.get("confidence") or 0),
            -len(atom.get("rule_text", "")),
        )
        if new_score < old_score:
            deduped[key] = atom
    return sorted(deduped.values(), key=lambda item: (version_key(item["app_version"]), item["topic_key"], item.get("merge_fingerprint", "")))


def sample_rules(atoms: list[dict], limit: int = 3) -> list[str]:
    picked: list[str] = []
    seen = set()
    for atom in sorted(atoms, key=lambda item: (item["source_rank"], item["app_version"], item["merge_fingerprint"])):
        text = atom.get("R") or atom.get("A") or atom.get("rule_text")
        text = clean_text(text)
        if not text or text in seen:
            continue
        seen.add(text)
        picked.append(text)
        if len(picked) >= limit:
            break
    return picked


def group_feature_catalog(atoms: list[dict]) -> list[dict]:
    groups: dict[str, list[dict]] = defaultdict(list)
    for atom in atoms:
        groups[atom["topic_key"]].append(atom)

    catalog = []
    for topic_key, items in groups.items():
        items.sort(key=lambda item: (version_key(item["app_version"]), item["source_rank"], item.get("merge_fingerprint", "")))
        versions = sorted({item["app_version"] for item in items}, key=version_key)
        latest_version = versions[-1]
        module = items[0]["primary_module"]
        touchpoints = sorted({tp for item in items for tp in item.get("touchpoints", [])})
        source_counts = Counter(item["source_type"] for item in items)
        search_terms = sorted({term for item in items for term in item.get("search_terms", [])})
        version_summaries = []
        for version in versions:
            version_atoms = [item for item in items if item["app_version"] == version]
            version_summaries.append(
                {
                    "app_version": version,
                    "sources": sorted({item["source_name"] for item in version_atoms}),
                    "rule_count": len(version_atoms),
                    "samples": sample_rules(version_atoms, limit=2),
                }
            )
        catalog.append(
            {
                "topic_key": topic_key,
                "feature_scope": items[0]["feature_scope"],
                "primary_module": module,
                "module_name": MODULE_NAMES.get(module, module),
                "modules": sorted({mod for item in items for mod in item["modules"]}),
                "latest_version": latest_version,
                "versions": versions,
                "version_count": len(versions),
                "rule_count": len(items),
                "source_counts": dict(sorted(source_counts.items(), key=lambda item: SOURCE_PRIORITY.get(item[0], 9))),
                "touchpoints": touchpoints,
                "search_terms": search_terms[:20],
                "latest_samples": sample_rules([item for item in items if item["app_version"] == latest_version], limit=3),
                "version_summaries": version_summaries,
            }
        )
    return sorted(catalog, key=lambda item: (item["primary_module"], item["feature_scope"]))


def build_review_index(catalog: list[dict]) -> list[dict]:
    grouped: dict[str, list[dict]] = defaultdict(list)
    for item in catalog:
        grouped[item["primary_module"]].append(item)

    review_index = []
    for module, items in sorted(grouped.items()):
        items.sort(key=lambda item: (-item["version_count"], -item["rule_count"], item["feature_scope"]))
        review_index.append(
            {
                "module": module,
                "module_name": MODULE_NAMES.get(module, module),
                "feature_count": len(items),
                "features": [
                    {
                        "feature_scope": item["feature_scope"],
                        "latest_version": item["latest_version"],
                        "versions": item["versions"],
                        "touchpoints": item["touchpoints"],
                        "samples": item["latest_samples"],
                    }
                    for item in items
                ],
            }
        )
    return review_index


def write_jsonl(path: Path, rows: Iterable[dict]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as handle:
        for row in rows:
            handle.write(json.dumps(row, ensure_ascii=False) + "\n")


def write_text(path: Path, text: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(text, encoding="utf-8")


def render_overview(catalog: list[dict], atoms: list[dict], issues: list[LoadIssue]) -> str:
    versions = sorted({atom["app_version"] for atom in atoms}, key=version_key)
    module_counter = Counter(atom["primary_module"] for atom in atoms)
    source_counter = Counter(atom["source_type"] for atom in atoms)
    qa_counter = Counter(atom.get("qa_status", "unknown") for atom in atoms)
    top_features = sorted(catalog, key=lambda item: (-item["version_count"], -item["rule_count"], item["feature_scope"]))[:30]

    lines = [
        "# 产品研发RAG 总览 v2",
        "",
        f"- 原子总数:{len(atoms)}",
        f"- 覆盖版本:{len(versions)} 个({versions[0]} ~ {versions[-1]})" if versions else "- 覆盖版本:0",
        f"- 功能主题数:{len(catalog)}",
        f"- 解析异常行:{len(issues)}",
        "",
        "## 来源分布",
        "",
    ]
    for source_type, count in sorted(source_counter.items(), key=lambda item: SOURCE_PRIORITY.get(item[0], 9)):
        lines.append(f"- {SOURCE_NAMES.get(source_type, source_type)}:{count}")
    lines.extend(["", "## 验收状态", ""])
    for qa_status, count in qa_counter.most_common():
        lines.append(f"- {qa_status}:{count}")
    lines.extend(["", "## 模块分布", ""])
    for module, count in module_counter.most_common():
        lines.append(f"- {module} / {MODULE_NAMES.get(module, module)}:{count}")
    lines.extend(["", "## 高价值功能主题", ""])
    for item in top_features:
        lines.append(
            f"- [{item['primary_module']}] {item['feature_scope']}:{item['version_count']} 个版本,最新 {item['latest_version']},样例 {(';'.join(item['latest_samples']) or '无')}"
        )
    if issues:
        lines.extend(["", "## 解析告警", ""])
        for issue in issues[:20]:
            lines.append(f"- {issue.path}:{issue.line_no} {issue.reason}")
    return "\n".join(lines) + "\n"


def render_history(catalog: list[dict]) -> str:
    lines = [
        "# 功能版本演进索引",
        "",
        "> 这份文档给对话式问答使用,重点回答“当前规则是什么、从哪个版本开始变化、有哪些版本证据”。",
        "",
    ]
    for item in sorted(catalog, key=lambda row: (-row["version_count"], row["primary_module"], row["feature_scope"]))[:200]:
        lines.append(f"## [{item['primary_module']}] {item['feature_scope']}")
        lines.append("")
        lines.append(f"- 最新版本:{item['latest_version']}")
        lines.append(f"- 涉及版本:{', '.join(item['versions'])}")
        if item["touchpoints"]:
            lines.append(f"- 触点:{', '.join(item['touchpoints'])}")
        if item["search_terms"]:
            lines.append(f"- 检索词:{', '.join(item['search_terms'][:10])}")
        lines.append("- 版本演进:")
        for summary in item["version_summaries"]:
            sample = ";".join(summary["samples"]) or "无明确文本样例"
            lines.append(
                f"  - {summary['app_version']} | 来源:{', '.join(summary['sources'])} | 规则数:{summary['rule_count']} | 样例:{sample}"
            )
        lines.append("")
    return "\n".join(lines) + "\n"


def render_review_playbook(review_index: list[dict]) -> str:
    lines = [
        "# 需求预评审指引",
        "",
        "> 使用方式:新需求进来后,先定位模块,再在对应功能主题下检查现有规则、触点和历史版本差异。",
        "",
    ]
    for module_info in review_index:
        lines.append(f"## {module_info['module']} / {module_info['module_name']}")
        lines.append("")
        lines.append(f"- 功能主题数:{module_info['feature_count']}")
        lines.append("- 预评审关注点:")
        lines.append(f"  - 先确认是否影响 {module_info['module_name']} 既有触点、限制条件、异常提示和跨模块链路")
        for feature in module_info["features"][:40]:
            touchpoints = "、".join(feature["touchpoints"][:4]) or "未标注"
            samples = ";".join(feature["samples"][:2]) or "无"
            lines.append(
                f"  - {feature['feature_scope']} | 最新 {feature['latest_version']} | 版本数 {len(feature['versions'])} | 触点 {touchpoints} | 样例 {samples}"
            )
        lines.append("")
    return "\n".join(lines) + "\n"


def render_usage_notes() -> str:
    return "\n".join(
        [
            "# 使用说明",
            "",
            "这套 v2 资产不是替代现有 `knowledge/*.md`,而是补上统一索引层。",
            "",
            "推荐导入顺序:",
            "- 先导入本目录下的 `00_使用说明.md`、`01_产品规则总览.md`、`02_功能版本演进.md`、`03_需求预评审指引.md`",
            "- 再导入现有 `knowledge/*.md` 模块规则文档",
            "",
            "推荐问答路由:",
            "- 产品逻辑查询:总览 + 模块规则文档",
            "- 版本变更查询:功能版本演进 + 模块规则文档",
            "- 需求预评审:需求预评审指引 + 功能版本演进 + 模块规则文档",
            "- 代码知识库扩展:后续把接口、数据表、核心类说明按相同 `primary_module / feature_scope / app_version` 结构接入",
            "",
            "后续建议:",
            "- 在新增版本入库时同步跑一次 `python3 scripts/build_rag_assets.py`",
            "- 把需求评审记录也沉淀成 atoms,补足“为什么这样设计”的决策上下文",
        ]
    ) + "\n"


def main() -> None:
    atoms, issues = iter_atoms()
    enriched = deduplicate_atoms(enrich_atom(atom) for atom in atoms if atom.get("app_version"))
    catalog = group_feature_catalog(enriched)
    review_index = build_review_index(catalog)

    write_jsonl(DIST_DIR / "master_atoms.jsonl", enriched)
    write_jsonl(DIST_DIR / "feature_catalog.jsonl", catalog)
    write_jsonl(DIST_DIR / "review_index.jsonl", review_index)
    write_jsonl(
        DIST_DIR / "load_issues.jsonl",
        ({"path": issue.path, "line_no": issue.line_no, "reason": issue.reason} for issue in issues),
    )

    write_text(KNOWLEDGE_DIR / "00_使用说明.md", render_usage_notes())
    write_text(KNOWLEDGE_DIR / "01_产品规则总览.md", render_overview(catalog, enriched, issues))
    write_text(KNOWLEDGE_DIR / "02_功能版本演进.md", render_history(catalog))
    write_text(KNOWLEDGE_DIR / "03_需求预评审指引.md", render_review_playbook(review_index))

    print(f"atoms={len(enriched)}")
    print(f"topics={len(catalog)}")
    print(f"issues={len(issues)}")
    print(f"output={DIST_DIR.relative_to(BASE_DIR)}")


if __name__ == "__main__":
    main()