🔄 卡若AI 同步 2026-03-12 23:10 | 更新:水桥平台对接、卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个

This commit is contained in:
2026-03-12 23:10:26 +08:00
parent 919c47e605
commit e0ee8c2e2a
12 changed files with 1603 additions and 68 deletions

View File

@@ -1,6 +1,6 @@
{
"access_token": "u-e9uHxerhZ7VrnwkCFC9Yvalh3Ix1ghWXpgGaZMk0260Y",
"refresh_token": "ur-dIaKx4ssV4SU.karG6lt9ulh1C11ghopOgGaYx00261E",
"access_token": "u-e1gGA5qAp2yXEyJOa2k9Zalh3KxxghONV0GaJwk0274U",
"refresh_token": "ur-cuP.iNyXxflV6qpKn0c2Lnlh3AxxghqPpwGaIQ4022gJ",
"name": "飞书用户",
"auth_time": "2026-03-12T20:33:34.954705"
"auth_time": "2026-03-12T22:44:05.347461"
}

View File

@@ -326,18 +326,29 @@ def _get_text_content(block: dict) -> str:
return (tr.get("content") or "")
def _get_elements_content(elements: list) -> str:
if not elements:
return ""
return (elements[0].get("text_run") or {}).get("content", "") or ""
def sanitize_blocks(blocks: list) -> list:
"""
飞书 docx blocks 对“空段落/异常结构”会严格校验。
这里做一次轻量清洗:去掉纯空文本块,避免 invalid param。
"""
"""飞书 docx blocks 轻量清洗:去掉空文本/空代码块/空 callout避免 invalid param。"""
out = []
for b in blocks:
if not isinstance(b, dict):
continue
if b.get("block_type") == 2:
c = _get_text_content(b)
if not c or not c.strip():
bt = b.get("block_type")
if bt == 2:
if not _get_text_content(b).strip():
continue
elif bt == 14:
elems = (b.get("code") or {}).get("elements") or []
if not _get_elements_content(elems).strip():
continue
elif bt == 19:
elems = (b.get("callout") or {}).get("elements") or []
if not _get_elements_content(elems).strip():
continue
out.append(b)
return out
@@ -438,6 +449,21 @@ def _write_batch_with_fallback(doc_token: str, headers: dict, batch: list, total
if r1.get("code") == 0:
time.sleep(0.35)
continue
bt = b.get("block_type")
# code(14) 和 callout(19) 失败时降级为文本块
fallback_content = ""
if bt == 14:
elems = (b.get("code") or {}).get("elements") or []
fallback_content = _get_elements_content(elems)
elif bt == 19:
elems = (b.get("callout") or {}).get("elements") or []
fallback_content = _get_elements_content(elems)
if fallback_content:
r2 = _post_children(doc_token, headers, [{"block_type": 2, "text": {"elements": [{"text_run": {"content": fallback_content, "text_element_style": {}}}], "style": {}}}], None)
if r2.get("code") == 0:
print(f"⚠️ block_type={bt} 降级为文本块写入")
time.sleep(0.35)
continue
c = _get_text_content(b)
preview = (c[:60] + "...") if c and len(c) > 60 else (c or "")
print(f"⚠️ 跳过非法块: code={r1.get('code')} msg={r1.get('msg')} preview={preview!r}")

View File

@@ -1,7 +1,12 @@
#!/usr/bin/env python3
"""
将 Markdown 本地转换为飞书文档 JSON 格式。
图片用占位符 __IMAGE:路径__ 标注,上传时替换为 file_token。
将 Markdown 本地转换为飞书文档 JSON 格式v2
· 代码围栏 → block_type:14 code block保留语言标注
· > 引用 → block_type:19 callout蓝色背景
· --- → block_type:22 divider
· #### → block_type:6 heading4
· 图片 → __IMAGE__ 占位符,上传时替换为 file_token
· 表格 → block_type:30 sheet超出 9×9 自动截断)
用法:
python3 md_to_feishu_json.py input.md output.json
@@ -14,24 +19,71 @@ import argparse
from pathlib import Path
def _h1(t):
# ── 语言代码映射(飞书 code block style.language ──────────────────────────
LANG_MAP: dict[str, int] = {
"python": 2, "py": 2,
"javascript": 3, "js": 3,
"typescript": 3, "ts": 3,
"shell": 6, "bash": 6, "sh": 6,
"sql": 8,
"json": 9,
"html": 11, "xml": 11,
"go": 16,
"rust": 22,
}
# ── Block 构造函数 ────────────────────────────────────────────────────────────
def _h1(t: str) -> dict:
return {"block_type": 3, "heading1": {"elements": [{"text_run": {"content": t, "text_element_style": {}}}], "style": {}}}
def _h2(t):
def _h2(t: str) -> dict:
return {"block_type": 4, "heading2": {"elements": [{"text_run": {"content": t, "text_element_style": {}}}], "style": {}}}
def _h3(t):
def _h3(t: str) -> dict:
return {"block_type": 5, "heading3": {"elements": [{"text_run": {"content": t, "text_element_style": {}}}], "style": {}}}
def _text(t):
def _h4(t: str) -> dict:
return {"block_type": 6, "heading4": {"elements": [{"text_run": {"content": t, "text_element_style": {}}}], "style": {}}}
def _text(t: str) -> dict:
return {"block_type": 2, "text": {"elements": [{"text_run": {"content": t, "text_element_style": {}}}], "style": {}}}
def _code(t: str, lang: int = 1) -> dict:
"""block_type:14 代码块lang 见 LANG_MAP1=纯文本/流程图"""
return {
"block_type": 14,
"code": {
"elements": [{"text_run": {"content": t, "text_element_style": {}}}],
"style": {"language": lang},
},
}
def _callout(t: str, bg: int = 2) -> dict:
"""block_type:19 高亮块bg: 1=白 2=蓝 3=绿 4=橙 5=黄 6=红 7=紫"""
return {
"block_type": 19,
"callout": {
"emoji_id": "blue_book",
"background_color": bg,
"border_color": bg,
"elements": [{"text_run": {"content": t, "text_element_style": {}}}],
},
}
def _divider() -> dict:
return {"block_type": 22, "divider": {}}
def _image_placeholder(idx: int, path: str) -> dict:
"""图片占位符,上传时由脚本替换为 gallery block"""
return {"__image__": path, "__index__": idx}
@@ -45,6 +97,8 @@ def _sheet_table(values: list[list[str]]) -> dict:
}
# ── 辅助函数 ──────────────────────────────────────────────────────────────────
def _parse_md_row(line: str) -> list[str]:
s = line.strip()
if s.startswith("|"):
@@ -63,54 +117,71 @@ def _is_md_table_sep(line: str) -> bool:
if s.endswith("|"):
s = s[:-1]
parts = [p.strip() for p in s.split("|")]
if not parts:
return False
return all(re.match(r"^:?-{3,}:?$", p or "") for p in parts)
return bool(parts) and all(re.match(r"^:?-{3,}:?$", p or "") for p in parts)
def _clean_inline_markdown(text: str) -> str:
"""清理常见行内 markdown 标记,输出适合飞书阅读的纯文本。"""
"""去掉常见行内 Markdown 标记,输出适合飞书的纯文本。"""
t = text
# 粗体/斜体标记
# 粗体
t = re.sub(r"\*\*(.*?)\*\*", r"\1", t)
t = re.sub(r"__(.*?)__", r"\1", t)
# 斜体
t = re.sub(r"\*(.*?)\*", r"\1", t)
t = re.sub(r"_(.*?)_", r"\1", t)
# 行内代码保留内容,去掉反引号
t = re.sub(r"_((?!_).*?)_", r"\1", t)
# 行内代码保留内容
t = re.sub(r"`([^`]+)`", r"\1", t)
# 链接 [text](url) → text
t = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", t)
return t.strip()
# ── 主转换函数 ─────────────────────────────────────────────────────────────────
def md_to_blocks(md: str, image_paths: list[str] | None = None) -> list:
"""将 Markdown 转为飞书 blocks"""
blocks = []
"""将 Markdown 字符串转为飞书 blocks 列表。"""
blocks: list[dict] = []
image_paths = image_paths or []
img_idx = 0
first_h1_consumed = False
in_code = False
code_lines = []
code_lines: list[str] = []
code_lang = 1
lines = md.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if line.strip().startswith("```"):
stripped = line.strip()
# ── 代码围栏 ───────────────────────────────────────────────────────────
if stripped.startswith("```"):
if in_code:
# 飞书 blocks 常对代码围栏/特殊格式更严格,这里转为普通文本行,提升美观与稳定性
for cl in code_lines:
if cl.strip():
blocks.append(_text(f"代码:{cl.strip()}"))
# 结束代码块 → 生成单个 block_type:14
code_content = "\n".join(code_lines)
if code_content.strip():
blocks.append(_code(code_content, code_lang))
code_lines = []
in_code = not in_code
code_lang = 1
in_code = False
else:
# 开始代码块 → 识别语言
lang_match = re.match(r"```(\w+)?", stripped)
code_lang = 1
if lang_match and lang_match.group(1):
code_lang = LANG_MAP.get(lang_match.group(1).lower(), 1)
in_code = True
i += 1
continue
if in_code:
code_lines.append(line)
i += 1
continue
# 图片语法 ![](path)
img_match = re.match(r"^!\[([^\]]*)\]\(([^)]+)\)\s*$", line.strip())
# ── 图片 ──────────────────────────────────────────────────────────────
img_match = re.match(r"^!\[([^\]]*)\]\(([^)]+)\)\s*$", stripped)
if img_match:
path = img_match.group(2)
if img_idx < len(image_paths):
@@ -120,7 +191,7 @@ def md_to_blocks(md: str, image_paths: list[str] | None = None) -> list:
i += 1
continue
# Markdown 表格:表头 + 分隔行 + 数据行
# ── Markdown 表格 ─────────────────────────────────────────────────────
if "|" in line and i + 1 < len(lines) and _is_md_table_sep(lines[i + 1]):
table_lines = [line]
j = i + 2
@@ -128,7 +199,7 @@ def md_to_blocks(md: str, image_paths: list[str] | None = None) -> list:
raw = lines[j].strip()
if not raw or "|" not in raw:
break
if raw.startswith("#") or raw.startswith(">") or raw.startswith("```"):
if raw.startswith(("#", ">", "```")):
break
table_lines.append(lines[j])
j += 1
@@ -143,46 +214,50 @@ def md_to_blocks(md: str, image_paths: list[str] | None = None) -> list:
rr.extend([""] * (col_size - len(rr)))
clean_rows.append(rr[:col_size])
# 飞书空 sheet 创建限制:行列最大 9超出时截断并提示
max_rows, max_cols = 9, 9
if len(clean_rows) > max_rows or col_size > max_cols:
blocks.append(_text("提示:原表格超出飞书单块上限,已自动截断为 9x9。"))
blocks.append(_text("(注:原表格超出飞书单块上限,已自动截断为 9×9 显示)"))
clipped = [r[:max_cols] for r in clean_rows[:max_rows]]
blocks.append(_sheet_table(clipped))
i = j
continue
# 忽略 Markdown 水平分隔线(避免在飞书出现大量“---”影响观感)
if line.strip() in {"---", "***", "___"}:
# ── 分割线 → block_type:22 ────────────────────────────────────────────
if stripped in {"---", "***", "___"}:
blocks.append(_divider())
i += 1
continue
# 标题
if line.startswith("# "):
# 避免正文和文档标题重复:默认跳过第一行 H1
# ── 标题 ──────────────────────────────────────────────────────────────
if line.startswith("#### "):
blocks.append(_h4(_clean_inline_markdown(line[5:].strip())))
elif line.startswith("### "):
blocks.append(_h3(_clean_inline_markdown(line[4:].strip())))
elif line.startswith("## "):
blocks.append(_h2(_clean_inline_markdown(line[3:].strip())))
elif line.startswith("# "):
if first_h1_consumed:
blocks.append(_h1(_clean_inline_markdown(line[2:].strip())))
else:
first_h1_consumed = True
elif line.startswith("## "):
blocks.append(_h2(_clean_inline_markdown(line[3:].strip())))
elif line.startswith("### "):
blocks.append(_h3(_clean_inline_markdown(line[4:].strip())))
elif line.lstrip().startswith(">"):
# 引用块转普通说明行,降低写入失败概率
quote = line.lstrip()
# ── 引用 → block_type:19 callout ─────────────────────────────────────
elif stripped.startswith(">"):
quote = stripped
while quote.startswith(">"):
quote = quote[1:].lstrip()
quote = _clean_inline_markdown(quote)
if quote:
blocks.append(_text(quote))
elif line.strip():
raw = line.strip()
# 无序列表统一成 •,减少 markdown 观感噪音
blocks.append(_callout(quote))
# ── 正文、列表 ────────────────────────────────────────────────────────
elif stripped:
raw = stripped
# 无序列表 → •
if re.match(r"^[-*]\s+", raw):
raw = "" + re.sub(r"^[-*]\s+", "", raw)
# 有序列表统一成 12样式
# 有序列表 12
raw = re.sub(r"^(\d+)\.\s+", r"\1", raw)
cleaned = _clean_inline_markdown(raw)
if cleaned:
@@ -194,13 +269,9 @@ def md_to_blocks(md: str, image_paths: list[str] | None = None) -> list:
def blocks_to_upload_format(blocks: list, base_dir: Path) -> tuple[list, list]:
"""
将含 __image__ 占位符的 blocks 转为可上传格式。
返回 (文本 blocks 列表, 图片路径列表,按出现顺序)。
image_paths 优先存相对路径(相对 base_dir便于 JSON 移植。
"""
out = []
paths = []
"""将含 __image__ 占位符的 blocks 转为可上传格式,返回 (blocks, image_paths)。"""
out: list = []
paths: list[str] = []
for b in blocks:
if isinstance(b, dict) and "__image__" in b:
path = b.get("__image__", "")
@@ -216,14 +287,16 @@ def blocks_to_upload_format(blocks: list, base_dir: Path) -> tuple[list, list]:
rel = str(resolved)
paths.append(rel)
else:
paths.append(path if path else "unknown")
out.append({"block_type": 2, "text": {"elements": [{"text_run": {"content": f"【配图 {len(paths)}:待上传】", "text_element_style": {}}}], "style": {}}})
paths.append(path or "unknown")
out.append(
_text(f"【配图 {len(paths)}:待上传】")
)
else:
out.append(b)
return out, paths
def main():
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("input", help="Markdown 文件")
ap.add_argument("output", help="输出 JSON 文件")
@@ -241,7 +314,7 @@ def main():
final, img_paths = blocks_to_upload_format(blocks, inp.parent)
out = {
"description": f"{inp.name} 转换的飞书 docx blocks",
"description": f"{inp.name} 转换的飞书 docx blocksv2",
"source": str(inp),
"image_paths": img_paths,
"children": final,

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
卡若AI 对话复盘总结 → 飞书群 webhook
每次对话完成后,将简洁复盘总结发到指定飞书群。
飞书 bot v2 hook 要求POST JSON含 msg_type如 text
用法:
# 直接传入简洁总结(建议 ≤500 字)
python3 send_review_to_feishu_webhook.py "【卡若AI复盘】2026-03-12 15:30\n🎯 完成记忆系统使用手册\n📌 已写开发文档/9、手册/卡若AI记忆系统使用手册.md\n▶ 无"
# 从文件读
python3 send_review_to_feishu_webhook.py --file /path/to/summary.txt
# 指定 webhook否则用默认
python3 send_review_to_feishu_webhook.py --webhook "https://open.feishu.cn/..." "总结内容"
环境变量(可选):
FEISHU_REVIEW_WEBHOOK — 默认 webhook URL
"""
import argparse
import json
import os
import sys
from pathlib import Path
import requests
# 默认 webhook卡若AI 复盘总结群);可被环境变量或 --webhook 覆盖
DEFAULT_WEBHOOK = os.environ.get(
"FEISHU_REVIEW_WEBHOOK",
"https://open.feishu.cn/open-apis/bot/v2/hook/8b7f996e-2892-4075-989f-aa5593ea4fbc",
)
def send_text(webhook_url: str, text: str) -> bool:
"""POST 文本到飞书 bot v2 webhook。"""
if not text or not webhook_url:
return False
payload = {"msg_type": "text", "content": {"text": text[:4000]}} # 飞书单条文本有长度限制
try:
r = requests.post(webhook_url, json=payload, timeout=10)
body = r.json()
if body.get("code") != 0:
print(f"飞书 webhook 返回错误: {body}", file=sys.stderr)
return False
return True
except Exception as e:
print(f"发送失败: {e}", file=sys.stderr)
return False
def main():
ap = argparse.ArgumentParser(description="卡若AI 复盘总结发飞书群")
ap.add_argument("text", nargs="?", default="", help="简洁复盘总结建议≤500字")
ap.add_argument("--file", "-f", help="从文件读取总结内容")
ap.add_argument("--webhook", "-w", default=DEFAULT_WEBHOOK, help="飞书群 webhook URL")
args = ap.parse_args()
if args.file:
path = Path(args.file)
if path.exists():
text = path.read_text(encoding="utf-8").strip()
else:
print(f"文件不存在: {args.file}", file=sys.stderr)
sys.exit(1)
else:
text = (args.text or sys.stdin.read()).strip()
if not text:
print("无内容可发送", file=sys.stderr)
sys.exit(1)
ok = send_text(args.webhook, text)
if ok:
print("已发送到飞书群")
else:
sys.exit(1)
if __name__ == "__main__":
main()