#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 老板分身/开发助理 - 规则进化脚本 支持:经验收集、经验池列表、规则进化(归档+应用)、进化日志 """ import argparse import json import os import re import sys from datetime import datetime from pathlib import Path # 路径别名 _CFG = Path(__file__).resolve().parent.parent / "config" if str(_CFG) not in sys.path: sys.path.insert(0, str(_CFG)) from paths import ROOT, EVOLUTION_ORANGE, ARCHIVED_ORANGE, RULE_MAIN, LOG_EVOLUTION, ROLE_TO_AGENT, agent_evolution PROJECT_ROOT = ROOT POOL_DIR = EVOLUTION_ORANGE ARCHIVE_DIR = ARCHIVED_ORANGE RULE_FILE = RULE_MAIN LOG_FILE = LOG_EVOLUTION def ensure_dirs(): POOL_DIR.mkdir(parents=True, exist_ok=True) ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) def _get_role_evolution_dir(role: str) -> Path: """获取角色对应的 evolution 目录。""" return agent_evolution(role) def _update_role_index(role: str, date_str: str, summary: str, filename: str): """更新角色经验池的 索引.md,在表格中追加一行。""" role_dir = _get_role_evolution_dir(role) role_dir.mkdir(parents=True, exist_ok=True) index_file = role_dir / "索引.md" new_row = f"| {date_str} | {summary} | [{filename}](./{filename}) |" if not index_file.exists(): index_content = f"""# {role} 经验索引 > 相关经验。角色激活时优先读取本索引。 | 日期 | 摘要 | 文件 | |------|------|------| {new_row} """ index_file.write_text(index_content, encoding="utf-8") else: content = index_file.read_text(encoding="utf-8") lines = content.split("\n") inserted = False for i, line in enumerate(lines): if line.strip() == "|------|------|------|": lines.insert(i + 1, new_row) inserted = True break if not inserted: lines.append(new_row) index_file.write_text("\n".join(lines), encoding="utf-8") print(f"已更新索引:{index_file}") def add_experience(data: dict, from_stdin: bool = False) -> str: """添加经验到经验池或角色经验池。data 可为 dict 或从 stdin 读取的 JSON。""" ensure_dirs() if from_stdin: raw = sys.stdin.read().strip() if not raw: print("错误:stdin 为空", file=sys.stderr) sys.exit(1) try: data = json.loads(raw) except json.JSONDecodeError as e: print(f"错误:JSON 解析失败 - {e}", file=sys.stderr) sys.exit(1) title = data.get("title", "未命名经验") date_str = data.get("date", datetime.now().strftime("%Y-%m-%d")) domain = data.get("domain", "通用") related_rule = data.get("related_rule", "无") target_roles = data.get("target_roles", []) if isinstance(target_roles, str): target_roles = [target_roles] if target_roles else [] problem = data.get("problem", "") solution = data.get("solution", "") decisions = data.get("decisions", "") rules_to_extract = data.get("rules_to_extract", "") code_example = data.get("code_example", "") safe_title = re.sub(r'[\\/:*?"<>|]', "", title)[:20] filename = f"{date_str}-{safe_title}.md" content = f"""# {title} ## 元信息 - **日期**:{date_str} - **领域**:{domain} - **目标角色**:{", ".join(target_roles) if target_roles else "通用"} - **关联规则**:{related_rule} ## 问题描述 {problem} ## 解决过程 {solution} ## 关键决策 {decisions} ## 可提炼的规则 {rules_to_extract} """ if code_example: content += f""" ## 示例代码/模式 ``` {code_example} ``` """ if target_roles: for role in target_roles: role_dir = _get_role_evolution_dir(role) role_dir.mkdir(parents=True, exist_ok=True) filepath = role_dir / filename filepath.write_text(content, encoding="utf-8") print(f"已写入:{filepath}") _update_role_index(role, date_str, safe_title, filename) return str(role_dir) else: filepath = POOL_DIR / filename filepath.write_text(content, encoding="utf-8") print(f"已写入:{filepath}") _update_role_index("开发助理", date_str, safe_title, filename) return str(filepath) def list_pool() -> list: """列出经验池中的文件(排除示例等)。""" ensure_dirs() files = [] for f in sorted(POOL_DIR.glob("*.md")): if f.name.startswith("示例") or "可删除" in f.name or f.name == "索引.md": continue files.append(f) return files def cmd_list(): """列出经验池""" files = list_pool() if not files: print("经验池(开发助理/evolution)为空") return for f in files: print(f" - {f.name}") def archive_files(filenames: list) -> list: """将 agent/开发助理/evolution 中的文件移入 agent/开发助理/archived。""" ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) archived = [] for name in filenames: src = POOL_DIR / name if not src.exists(): print(f"跳过(不存在):{name}", file=sys.stderr) continue dst = ARCHIVE_DIR / name if dst.exists(): base, ext = os.path.splitext(name) dst = ARCHIVE_DIR / f"{base}-{datetime.now().strftime('%H%M%S')}{ext}" src.rename(dst) archived.append(name) print(f"已归档:{name} -> {dst.name}") return archived def append_log(entries: list, trigger: str = "用户执行"): """追加进化日志。""" ensure_dirs() today = datetime.now().strftime("%Y-%m-%d") block = f""" ## {datetime.now().strftime("%Y-%m-%d %H:%M")} ### 触发 - {trigger} ### 处理的经验 """ for e in entries: block += f"- {e}\n" block += "\n" if LOG_FILE.exists(): content = LOG_FILE.read_text(encoding="utf-8") if "---" in content: parts = content.split("---", 1) content = parts[0] + "---" + block + "---" + parts[1] else: content += block else: content = "# 进化日志\n\n" + block LOG_FILE.write_text(content, encoding="utf-8") print(f"已更新:{LOG_FILE}") def cmd_evolve(archive_all: bool = False, rule_content_file: str = None): """执行进化:归档经验池、可选应用新规则、追加进化日志。""" ensure_dirs() files = list_pool() if not files: print("经验池为空,无需进化") return if rule_content_file: path = Path(rule_content_file) if not path.is_absolute(): path = ROOT / rule_content_file if path.exists(): new_content = path.read_text(encoding="utf-8") RULE_FILE.write_text(new_content, encoding="utf-8") print(f"已更新主规则:{RULE_FILE}") else: print(f"错误:文件不存在 {path}", file=sys.stderr) sys.exit(1) to_archive = [f.name for f in files] if archive_all or rule_content_file else [] if to_archive: archived = archive_files(to_archive) append_log(archived, "规则进化脚本执行") else: print("提示:使用 --archive 将经验池文件移入已归档") print("提示:使用 --rule <文件路径> 应用 AI 生成的新规则内容") def cmd_archive(filenames: list): """归档指定文件""" archived = archive_files(filenames) if archived: append_log(archived, "手动归档") def main(): parser = argparse.ArgumentParser(description="老板分身 - 规则进化") sub = parser.add_subparsers(dest="cmd", help="命令") p_add = sub.add_parser("add", help="添加经验到经验池") p_add.add_argument("--input", "-i", help="JSON 文件路径") p_add.add_argument("--stdin", action="store_true", help="从 stdin 读取 JSON") p_add.add_argument("--title", help="经验标题") p_add.add_argument("--problem", help="问题描述") p_add.add_argument("--solution", help="解决过程") p_add.add_argument("--decisions", default="", help="关键决策") p_add.add_argument("--domain", default="通用", help="领域") p_add.add_argument("--related", default="无", help="关联规则") sub.add_parser("list", help="列出经验池") p_evolve = sub.add_parser("evolve", help="执行进化(归档+应用规则)") p_evolve.add_argument("--archive", "-a", action="store_true", help="归档经验池全部文件") p_evolve.add_argument("--rule", "-r", help="应用新规则内容文件路径") p_arch = sub.add_parser("archive", help="归档指定文件") p_arch.add_argument("files", nargs="+", help="文件名") args = parser.parse_args() if args.cmd == "add": if args.stdin: add_experience({}, from_stdin=True) elif args.input: input_path = Path(args.input) if not input_path.is_absolute(): input_path = ROOT / args.input data = json.loads(input_path.read_text(encoding="utf-8")) add_experience(data) elif args.title and args.problem and args.solution: add_experience({ "title": args.title, "problem": args.problem, "solution": args.solution, "decisions": args.decisions or "", "domain": args.domain, "related_rule": args.related, "rules_to_extract": "", }) else: print("用法:") print(" python .cursor/scripts/evolution.py add --stdin # 从 stdin 读 JSON") print(" python .cursor/scripts/evolution.py add -i experience.json") sys.exit(1) elif args.cmd == "list": cmd_list() elif args.cmd == "evolve": cmd_evolve(archive_all=args.archive, rule_content_file=args.rule) elif args.cmd == "archive": cmd_archive(args.files) else: parser.print_help() if __name__ == "__main__": main()