312 lines
10 KiB
Python
312 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
老板分身/开发助理 - 规则进化脚本
|
||
支持:经验收集、经验池列表、规则进化(归档+应用)、进化日志
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# 路径别名
|
||
_CFG = Path(__file__).resolve().parent.parent / "config"
|
||
if str(_CFG) not in sys.path:
|
||
sys.path.insert(0, str(_CFG))
|
||
from paths import ROOT, EVOLUTION_ORANGE, ARCHIVED_ORANGE, RULE_MAIN, LOG_EVOLUTION, ROLE_TO_AGENT, agent_evolution
|
||
|
||
PROJECT_ROOT = ROOT
|
||
POOL_DIR = EVOLUTION_ORANGE
|
||
ARCHIVE_DIR = ARCHIVED_ORANGE
|
||
RULE_FILE = RULE_MAIN
|
||
LOG_FILE = LOG_EVOLUTION
|
||
|
||
|
||
def ensure_dirs():
|
||
POOL_DIR.mkdir(parents=True, exist_ok=True)
|
||
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def _get_role_evolution_dir(role: str) -> Path:
|
||
"""获取角色对应的 evolution 目录。"""
|
||
return agent_evolution(role)
|
||
|
||
|
||
def _update_role_index(role: str, date_str: str, summary: str, filename: str):
|
||
"""更新角色经验池的 索引.md,在表格中追加一行。"""
|
||
role_dir = _get_role_evolution_dir(role)
|
||
role_dir.mkdir(parents=True, exist_ok=True)
|
||
index_file = role_dir / "索引.md"
|
||
new_row = f"| {date_str} | {summary} | [{filename}](./{filename}) |"
|
||
if not index_file.exists():
|
||
index_content = f"""# {role} 经验索引
|
||
|
||
> 相关经验。角色激活时优先读取本索引。
|
||
|
||
| 日期 | 摘要 | 文件 |
|
||
|------|------|------|
|
||
{new_row}
|
||
"""
|
||
index_file.write_text(index_content, encoding="utf-8")
|
||
else:
|
||
content = index_file.read_text(encoding="utf-8")
|
||
lines = content.split("\n")
|
||
inserted = False
|
||
for i, line in enumerate(lines):
|
||
if line.strip() == "|------|------|------|":
|
||
lines.insert(i + 1, new_row)
|
||
inserted = True
|
||
break
|
||
if not inserted:
|
||
lines.append(new_row)
|
||
index_file.write_text("\n".join(lines), encoding="utf-8")
|
||
print(f"已更新索引:{index_file}")
|
||
|
||
|
||
def add_experience(data: dict, from_stdin: bool = False) -> str:
|
||
"""添加经验到经验池或角色经验池。data 可为 dict 或从 stdin 读取的 JSON。"""
|
||
ensure_dirs()
|
||
if from_stdin:
|
||
raw = sys.stdin.read().strip()
|
||
if not raw:
|
||
print("错误:stdin 为空", file=sys.stderr)
|
||
sys.exit(1)
|
||
try:
|
||
data = json.loads(raw)
|
||
except json.JSONDecodeError as e:
|
||
print(f"错误:JSON 解析失败 - {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
title = data.get("title", "未命名经验")
|
||
date_str = data.get("date", datetime.now().strftime("%Y-%m-%d"))
|
||
domain = data.get("domain", "通用")
|
||
related_rule = data.get("related_rule", "无")
|
||
target_roles = data.get("target_roles", [])
|
||
if isinstance(target_roles, str):
|
||
target_roles = [target_roles] if target_roles else []
|
||
problem = data.get("problem", "")
|
||
solution = data.get("solution", "")
|
||
decisions = data.get("decisions", "")
|
||
rules_to_extract = data.get("rules_to_extract", "")
|
||
code_example = data.get("code_example", "")
|
||
|
||
safe_title = re.sub(r'[\\/:*?"<>|]', "", title)[:20]
|
||
filename = f"{date_str}-{safe_title}.md"
|
||
content = f"""# {title}
|
||
|
||
## 元信息
|
||
|
||
- **日期**:{date_str}
|
||
- **领域**:{domain}
|
||
- **目标角色**:{", ".join(target_roles) if target_roles else "通用"}
|
||
- **关联规则**:{related_rule}
|
||
|
||
## 问题描述
|
||
|
||
{problem}
|
||
|
||
## 解决过程
|
||
|
||
{solution}
|
||
|
||
## 关键决策
|
||
|
||
{decisions}
|
||
|
||
## 可提炼的规则
|
||
|
||
{rules_to_extract}
|
||
"""
|
||
if code_example:
|
||
content += f"""
|
||
## 示例代码/模式
|
||
|
||
```
|
||
{code_example}
|
||
```
|
||
"""
|
||
|
||
if target_roles:
|
||
for role in target_roles:
|
||
role_dir = _get_role_evolution_dir(role)
|
||
role_dir.mkdir(parents=True, exist_ok=True)
|
||
filepath = role_dir / filename
|
||
filepath.write_text(content, encoding="utf-8")
|
||
print(f"已写入:{filepath}")
|
||
_update_role_index(role, date_str, safe_title, filename)
|
||
return str(role_dir)
|
||
else:
|
||
filepath = POOL_DIR / filename
|
||
filepath.write_text(content, encoding="utf-8")
|
||
print(f"已写入:{filepath}")
|
||
_update_role_index("开发助理", date_str, safe_title, filename)
|
||
return str(filepath)
|
||
|
||
|
||
def list_pool() -> list:
|
||
"""列出经验池中的文件(排除示例等)。"""
|
||
ensure_dirs()
|
||
files = []
|
||
for f in sorted(POOL_DIR.glob("*.md")):
|
||
if f.name.startswith("示例") or "可删除" in f.name or f.name == "索引.md":
|
||
continue
|
||
files.append(f)
|
||
return files
|
||
|
||
|
||
def cmd_list():
|
||
"""列出经验池"""
|
||
files = list_pool()
|
||
if not files:
|
||
print("经验池(开发助理/evolution)为空")
|
||
return
|
||
for f in files:
|
||
print(f" - {f.name}")
|
||
|
||
|
||
def archive_files(filenames: list) -> list:
|
||
"""将 agent/开发助理/evolution 中的文件移入 agent/开发助理/archived。"""
|
||
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
|
||
archived = []
|
||
for name in filenames:
|
||
src = POOL_DIR / name
|
||
if not src.exists():
|
||
print(f"跳过(不存在):{name}", file=sys.stderr)
|
||
continue
|
||
dst = ARCHIVE_DIR / name
|
||
if dst.exists():
|
||
base, ext = os.path.splitext(name)
|
||
dst = ARCHIVE_DIR / f"{base}-{datetime.now().strftime('%H%M%S')}{ext}"
|
||
src.rename(dst)
|
||
archived.append(name)
|
||
print(f"已归档:{name} -> {dst.name}")
|
||
return archived
|
||
|
||
|
||
def append_log(entries: list, trigger: str = "用户执行"):
|
||
"""追加进化日志。"""
|
||
ensure_dirs()
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
block = f"""
|
||
## {datetime.now().strftime("%Y-%m-%d %H:%M")}
|
||
|
||
### 触发
|
||
- {trigger}
|
||
|
||
### 处理的经验
|
||
"""
|
||
for e in entries:
|
||
block += f"- {e}\n"
|
||
block += "\n"
|
||
|
||
if LOG_FILE.exists():
|
||
content = LOG_FILE.read_text(encoding="utf-8")
|
||
if "---" in content:
|
||
parts = content.split("---", 1)
|
||
content = parts[0] + "---" + block + "---" + parts[1]
|
||
else:
|
||
content += block
|
||
else:
|
||
content = "# 进化日志\n\n" + block
|
||
|
||
LOG_FILE.write_text(content, encoding="utf-8")
|
||
print(f"已更新:{LOG_FILE}")
|
||
|
||
|
||
def cmd_evolve(archive_all: bool = False, rule_content_file: str = None):
|
||
"""执行进化:归档经验池、可选应用新规则、追加进化日志。"""
|
||
ensure_dirs()
|
||
files = list_pool()
|
||
if not files:
|
||
print("经验池为空,无需进化")
|
||
return
|
||
|
||
if rule_content_file:
|
||
path = Path(rule_content_file)
|
||
if not path.is_absolute():
|
||
path = ROOT / rule_content_file
|
||
if path.exists():
|
||
new_content = path.read_text(encoding="utf-8")
|
||
RULE_FILE.write_text(new_content, encoding="utf-8")
|
||
print(f"已更新主规则:{RULE_FILE}")
|
||
else:
|
||
print(f"错误:文件不存在 {path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
to_archive = [f.name for f in files] if archive_all or rule_content_file else []
|
||
if to_archive:
|
||
archived = archive_files(to_archive)
|
||
append_log(archived, "规则进化脚本执行")
|
||
else:
|
||
print("提示:使用 --archive 将经验池文件移入已归档")
|
||
print("提示:使用 --rule <文件路径> 应用 AI 生成的新规则内容")
|
||
|
||
|
||
def cmd_archive(filenames: list):
|
||
"""归档指定文件"""
|
||
archived = archive_files(filenames)
|
||
if archived:
|
||
append_log(archived, "手动归档")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="老板分身 - 规则进化")
|
||
sub = parser.add_subparsers(dest="cmd", help="命令")
|
||
p_add = sub.add_parser("add", help="添加经验到经验池")
|
||
p_add.add_argument("--input", "-i", help="JSON 文件路径")
|
||
p_add.add_argument("--stdin", action="store_true", help="从 stdin 读取 JSON")
|
||
p_add.add_argument("--title", help="经验标题")
|
||
p_add.add_argument("--problem", help="问题描述")
|
||
p_add.add_argument("--solution", help="解决过程")
|
||
p_add.add_argument("--decisions", default="", help="关键决策")
|
||
p_add.add_argument("--domain", default="通用", help="领域")
|
||
p_add.add_argument("--related", default="无", help="关联规则")
|
||
sub.add_parser("list", help="列出经验池")
|
||
p_evolve = sub.add_parser("evolve", help="执行进化(归档+应用规则)")
|
||
p_evolve.add_argument("--archive", "-a", action="store_true", help="归档经验池全部文件")
|
||
p_evolve.add_argument("--rule", "-r", help="应用新规则内容文件路径")
|
||
p_arch = sub.add_parser("archive", help="归档指定文件")
|
||
p_arch.add_argument("files", nargs="+", help="文件名")
|
||
args = parser.parse_args()
|
||
|
||
if args.cmd == "add":
|
||
if args.stdin:
|
||
add_experience({}, from_stdin=True)
|
||
elif args.input:
|
||
input_path = Path(args.input)
|
||
if not input_path.is_absolute():
|
||
input_path = ROOT / args.input
|
||
data = json.loads(input_path.read_text(encoding="utf-8"))
|
||
add_experience(data)
|
||
elif args.title and args.problem and args.solution:
|
||
add_experience({
|
||
"title": args.title,
|
||
"problem": args.problem,
|
||
"solution": args.solution,
|
||
"decisions": args.decisions or "",
|
||
"domain": args.domain,
|
||
"related_rule": args.related,
|
||
"rules_to_extract": "",
|
||
})
|
||
else:
|
||
print("用法:")
|
||
print(" python .cursor/scripts/evolution.py add --stdin # 从 stdin 读 JSON")
|
||
print(" python .cursor/scripts/evolution.py add -i experience.json")
|
||
sys.exit(1)
|
||
elif args.cmd == "list":
|
||
cmd_list()
|
||
elif args.cmd == "evolve":
|
||
cmd_evolve(archive_all=args.archive, rule_content_file=args.rule)
|
||
elif args.cmd == "archive":
|
||
cmd_archive(args.files)
|
||
else:
|
||
parser.print_help()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|