Files
soul-yongping/.cursor/scripts/evolution.py

312 lines
10 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
老板分身/开发助理 - 规则进化脚本
支持经验收集经验池列表规则进化归档+应用进化日志
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
# 路径别名
_CFG = Path(__file__).resolve().parent.parent / "config"
if str(_CFG) not in sys.path:
sys.path.insert(0, str(_CFG))
from paths import ROOT, EVOLUTION_ORANGE, ARCHIVED_ORANGE, RULE_MAIN, LOG_EVOLUTION, ROLE_TO_AGENT, agent_evolution
PROJECT_ROOT = ROOT
POOL_DIR = EVOLUTION_ORANGE
ARCHIVE_DIR = ARCHIVED_ORANGE
RULE_FILE = RULE_MAIN
LOG_FILE = LOG_EVOLUTION
def ensure_dirs():
POOL_DIR.mkdir(parents=True, exist_ok=True)
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
def _get_role_evolution_dir(role: str) -> Path:
"""获取角色对应的 evolution 目录。"""
return agent_evolution(role)
def _update_role_index(role: str, date_str: str, summary: str, filename: str):
"""更新角色经验池的 索引.md在表格中追加一行。"""
role_dir = _get_role_evolution_dir(role)
role_dir.mkdir(parents=True, exist_ok=True)
index_file = role_dir / "索引.md"
new_row = f"| {date_str} | {summary} | [{filename}](./{filename}) |"
if not index_file.exists():
index_content = f"""# {role} 经验索引
> 相关经验角色激活时优先读取本索引
| 日期 | 摘要 | 文件 |
|------|------|------|
{new_row}
"""
index_file.write_text(index_content, encoding="utf-8")
else:
content = index_file.read_text(encoding="utf-8")
lines = content.split("\n")
inserted = False
for i, line in enumerate(lines):
if line.strip() == "|------|------|------|":
lines.insert(i + 1, new_row)
inserted = True
break
if not inserted:
lines.append(new_row)
index_file.write_text("\n".join(lines), encoding="utf-8")
print(f"已更新索引:{index_file}")
def add_experience(data: dict, from_stdin: bool = False) -> str:
"""添加经验到经验池或角色经验池。data 可为 dict 或从 stdin 读取的 JSON。"""
ensure_dirs()
if from_stdin:
raw = sys.stdin.read().strip()
if not raw:
print("错误stdin 为空", file=sys.stderr)
sys.exit(1)
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
print(f"错误JSON 解析失败 - {e}", file=sys.stderr)
sys.exit(1)
title = data.get("title", "未命名经验")
date_str = data.get("date", datetime.now().strftime("%Y-%m-%d"))
domain = data.get("domain", "通用")
related_rule = data.get("related_rule", "")
target_roles = data.get("target_roles", [])
if isinstance(target_roles, str):
target_roles = [target_roles] if target_roles else []
problem = data.get("problem", "")
solution = data.get("solution", "")
decisions = data.get("decisions", "")
rules_to_extract = data.get("rules_to_extract", "")
code_example = data.get("code_example", "")
safe_title = re.sub(r'[\\/:*?"<>|]', "", title)[:20]
filename = f"{date_str}-{safe_title}.md"
content = f"""# {title}
## 元信息
- **日期**{date_str}
- **领域**{domain}
- **目标角色**{", ".join(target_roles) if target_roles else "通用"}
- **关联规则**{related_rule}
## 问题描述
{problem}
## 解决过程
{solution}
## 关键决策
{decisions}
## 可提炼的规则
{rules_to_extract}
"""
if code_example:
content += f"""
## 示例代码/模式
```
{code_example}
```
"""
if target_roles:
for role in target_roles:
role_dir = _get_role_evolution_dir(role)
role_dir.mkdir(parents=True, exist_ok=True)
filepath = role_dir / filename
filepath.write_text(content, encoding="utf-8")
print(f"已写入:{filepath}")
_update_role_index(role, date_str, safe_title, filename)
return str(role_dir)
else:
filepath = POOL_DIR / filename
filepath.write_text(content, encoding="utf-8")
print(f"已写入:{filepath}")
_update_role_index("开发助理", date_str, safe_title, filename)
return str(filepath)
def list_pool() -> list:
"""列出经验池中的文件(排除示例等)。"""
ensure_dirs()
files = []
for f in sorted(POOL_DIR.glob("*.md")):
if f.name.startswith("示例") or "可删除" in f.name or f.name == "索引.md":
continue
files.append(f)
return files
def cmd_list():
"""列出经验池"""
files = list_pool()
if not files:
print("经验池(开发助理/evolution为空")
return
for f in files:
print(f" - {f.name}")
def archive_files(filenames: list) -> list:
"""将 agent/开发助理/evolution 中的文件移入 agent/开发助理/archived。"""
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
archived = []
for name in filenames:
src = POOL_DIR / name
if not src.exists():
print(f"跳过(不存在):{name}", file=sys.stderr)
continue
dst = ARCHIVE_DIR / name
if dst.exists():
base, ext = os.path.splitext(name)
dst = ARCHIVE_DIR / f"{base}-{datetime.now().strftime('%H%M%S')}{ext}"
src.rename(dst)
archived.append(name)
print(f"已归档:{name} -> {dst.name}")
return archived
def append_log(entries: list, trigger: str = "用户执行"):
"""追加进化日志。"""
ensure_dirs()
today = datetime.now().strftime("%Y-%m-%d")
block = f"""
## {datetime.now().strftime("%Y-%m-%d %H:%M")}
### 触发
- {trigger}
### 处理的经验
"""
for e in entries:
block += f"- {e}\n"
block += "\n"
if LOG_FILE.exists():
content = LOG_FILE.read_text(encoding="utf-8")
if "---" in content:
parts = content.split("---", 1)
content = parts[0] + "---" + block + "---" + parts[1]
else:
content += block
else:
content = "# 进化日志\n\n" + block
LOG_FILE.write_text(content, encoding="utf-8")
print(f"已更新:{LOG_FILE}")
def cmd_evolve(archive_all: bool = False, rule_content_file: str = None):
"""执行进化:归档经验池、可选应用新规则、追加进化日志。"""
ensure_dirs()
files = list_pool()
if not files:
print("经验池为空,无需进化")
return
if rule_content_file:
path = Path(rule_content_file)
if not path.is_absolute():
path = ROOT / rule_content_file
if path.exists():
new_content = path.read_text(encoding="utf-8")
RULE_FILE.write_text(new_content, encoding="utf-8")
print(f"已更新主规则:{RULE_FILE}")
else:
print(f"错误:文件不存在 {path}", file=sys.stderr)
sys.exit(1)
to_archive = [f.name for f in files] if archive_all or rule_content_file else []
if to_archive:
archived = archive_files(to_archive)
append_log(archived, "规则进化脚本执行")
else:
print("提示:使用 --archive 将经验池文件移入已归档")
print("提示:使用 --rule <文件路径> 应用 AI 生成的新规则内容")
def cmd_archive(filenames: list):
"""归档指定文件"""
archived = archive_files(filenames)
if archived:
append_log(archived, "手动归档")
def main():
parser = argparse.ArgumentParser(description="老板分身 - 规则进化")
sub = parser.add_subparsers(dest="cmd", help="命令")
p_add = sub.add_parser("add", help="添加经验到经验池")
p_add.add_argument("--input", "-i", help="JSON 文件路径")
p_add.add_argument("--stdin", action="store_true", help="从 stdin 读取 JSON")
p_add.add_argument("--title", help="经验标题")
p_add.add_argument("--problem", help="问题描述")
p_add.add_argument("--solution", help="解决过程")
p_add.add_argument("--decisions", default="", help="关键决策")
p_add.add_argument("--domain", default="通用", help="领域")
p_add.add_argument("--related", default="", help="关联规则")
sub.add_parser("list", help="列出经验池")
p_evolve = sub.add_parser("evolve", help="执行进化(归档+应用规则)")
p_evolve.add_argument("--archive", "-a", action="store_true", help="归档经验池全部文件")
p_evolve.add_argument("--rule", "-r", help="应用新规则内容文件路径")
p_arch = sub.add_parser("archive", help="归档指定文件")
p_arch.add_argument("files", nargs="+", help="文件名")
args = parser.parse_args()
if args.cmd == "add":
if args.stdin:
add_experience({}, from_stdin=True)
elif args.input:
input_path = Path(args.input)
if not input_path.is_absolute():
input_path = ROOT / args.input
data = json.loads(input_path.read_text(encoding="utf-8"))
add_experience(data)
elif args.title and args.problem and args.solution:
add_experience({
"title": args.title,
"problem": args.problem,
"solution": args.solution,
"decisions": args.decisions or "",
"domain": args.domain,
"related_rule": args.related,
"rules_to_extract": "",
})
else:
print("用法:")
print(" python .cursor/scripts/evolution.py add --stdin # 从 stdin 读 JSON")
print(" python .cursor/scripts/evolution.py add -i experience.json")
sys.exit(1)
elif args.cmd == "list":
cmd_list()
elif args.cmd == "evolve":
cmd_evolve(archive_all=args.archive, rule_content_file=args.rule)
elif args.cmd == "archive":
cmd_archive(args.files)
else:
parser.print_help()
if __name__ == "__main__":
main()