#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从书稿 Markdown 中仅解析「图片」引用并上传到现网,再生成 HTML 写入 chapters.content。 - 只处理:`![](相对/绝对路径.png)` 等常见图片后缀;**不解析、不上传视频/附件**。 - 已是 `http(s)://` 的地址:不重复上传,原样写入 ``。 - 非图片后缀的 `![]()`:当作普通正文一行输出(不尝试上传)。 用法: cd 一场soul的创业实验-永平 python3 scripts/sync_chapter_images_from_md.py --id 10.22 \\ --md "/path/to/第130场|….md" 依赖: pip install pymysql requests 环境变量: SOUL_API_BASE 默认 https://soulapi.quwanzhi.com """ from __future__ import annotations import argparse import html import importlib.util import os import re import sys from pathlib import Path ROOT = Path(__file__).resolve().parent.parent IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".webp"}) # 整行仅有一张图:![](...) LINE_IMAGE_ONLY = re.compile(r"^\s*!\[([^\]]*)\]\(([^)]+)\)\s*$") try: import pymysql import requests except ImportError as e: print("需要: pip install pymysql requests", e, file=sys.stderr) sys.exit(1) def load_db_config() -> dict: mig = ROOT / "scripts" / "migrate_2026_sections.py" spec = importlib.util.spec_from_file_location("_mig_db", mig) mod = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(mod) cfg = getattr(mod, "DB_CONFIG", None) if not isinstance(cfg, dict): sys.exit("migrate_2026_sections.py 中无有效 DB_CONFIG") return cfg def resolve_local_path(md_path: Path, ref: str) -> Path | None: ref = ref.strip() if not ref or ref.startswith(("http://", "https://")): return None p = (md_path.parent / ref).expanduser().resolve() if p.is_file(): return p return None def guess_mime(path: Path) -> str: ext = path.suffix.lower() return { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", }.get(ext, "application/octet-stream") def upload_image(local: Path, api_base: str) -> str: url = f"{api_base.rstrip('/')}/api/upload" mime = guess_mime(local) with local.open("rb") as f: files = {"file": (local.name, f, mime)} data = {"folder": "book-images"} r = requests.post(url, files=files, data=data, timeout=120) r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(j.get("error") or j.get("message") or str(j)) out = j.get("url") or (j.get("data") or {}).get("url") if not out: raise RuntimeError("响应无 url: " + str(j)[:500]) return str(out) def md_to_html(md_path: Path, api_base: str) -> str: raw = md_path.read_text(encoding="utf-8") lines = raw.splitlines() if lines and lines[0].lstrip().startswith("#"): lines = lines[1:] chunks: list[str] = [] upload_cache: dict[str, str] = {} for line in lines: if line.strip() == "---": chunks.append("") continue m = LINE_IMAGE_ONLY.match(line) if m: alt, ref = m.group(1), m.group(2).strip() if ref.startswith(("http://", "https://")): chunks.append( f'

{html.escape(alt)}

' ) continue loc = resolve_local_path(md_path, ref) if not loc: chunks.append(f"

(图片路径无效:{html.escape(ref)})

") continue ext = loc.suffix.lower() if ext not in IMAGE_EXT: # 非图片(如视频):不解析、不上传,整行当正文 chunks.append(f"

{html.escape(line.strip())}

") continue key = str(loc) if key not in upload_cache: print(f"上传图片: {loc.name} …", flush=True) upload_cache[key] = upload_image(loc, api_base) src = upload_cache[key] chunks.append( f'

{html.escape(alt)}

' ) continue if line.strip(): chunks.append(f"

{html.escape(line.strip())}

") else: chunks.append("") # 合并连续空串为单个换行,避免多余空

html_parts: list[str] = [] for c in chunks: if c == "": if html_parts and html_parts[-1] != "": html_parts.append("") else: html_parts.append(c) return "\n".join(html_parts).strip() + "\n" def main() -> None: p = argparse.ArgumentParser(description="MD 内图片上传并写回 chapters(仅图片)") p.add_argument("--id", required=True, help="章节 id,如 10.22") p.add_argument("--md", type=Path, required=True, help="文章 .md 路径") p.add_argument( "--api-base", default=os.environ.get("SOUL_API_BASE", "https://soulapi.quwanzhi.com"), help="API 根地址", ) p.add_argument("--dry-run", action="store_true", help="只打印 HTML 前 800 字,不写库") args = p.parse_args() md_path = args.md.expanduser().resolve() if not md_path.is_file(): sys.exit(f"文件不存在: {md_path}") body = md_to_html(md_path, args.api_base) word_count = len(body) if args.dry_run: print(body[:800]) print("… dry-run,word_count=", word_count) return cfg = load_db_config() conn = pymysql.connect(**cfg) cur = conn.cursor() cur.execute( "UPDATE chapters SET content = %s, word_count = %s, updated_at = NOW() WHERE id = %s", (body, word_count, args.id), ) if cur.rowcount != 1: conn.rollback() sys.exit(f"更新失败:id={args.id} rowcount={cur.rowcount}") conn.commit() conn.close() print(f"已更新 {args.id} | word_count={word_count}") if __name__ == "__main__": main()