#!/usr/bin/env python3 """ Soul 内容上传接口 可从 Cursor Skill / 命令行直接调用,将新内容写入数据库 用法: python3 content_upload.py --title "标题" --price 1.0 --content "正文" \ --part part-1 --chapter chapter-1 --format markdown python3 content_upload.py --json '{ "title": "标题", "price": 1.0, "content": "正文内容...", "part_id": "part-1", "chapter_id": "chapter-1", "format": "markdown", "images": ["https://xxx.com/img1.png"] }' python3 content_upload.py --list-structure # 查看篇章结构 环境依赖: pip install pymysql """ import argparse import json import sys import re from datetime import datetime try: import pymysql except ImportError: print("需要安装 pymysql: pip3 install pymysql") sys.exit(1) DB_CONFIG = { "host": "56b4c23f6853c.gz.cdb.myqcloud.com", "port": 14413, "user": "cdb_outerroot", "password": "Zhiqun1984", "database": "soul_miniprogram", "charset": "utf8mb4", } PART_MAP = { "part-1": "第一篇|真实的人", "part-2": "第二篇|真实的行业", "part-3": "第三篇|真实的错误", "part-4": "第四篇|真实的赚钱", "part-5": "第五篇|真实的社会", "appendix": "附录", "intro": "序言", "outro": "尾声", } CHAPTER_MAP = { "chapter-1": "第1章|人与人之间的底层逻辑", "chapter-2": "第2章|人性困境案例", "chapter-3": "第3章|电商篇", "chapter-4": "第4章|内容商业篇", "chapter-5": "第5章|传统行业篇", "chapter-6": "第6章|我人生错过的4件大钱", "chapter-7": "第7章|别人犯的错误", "chapter-8": "第8章|底层结构", "chapter-9": "第9章|我在Soul上亲访的赚钱案例", "chapter-10": "第10章|未来职业的变化趋势", "chapter-11": "第11章|中国社会商业生态的未来", "appendix": "附录", "preface": "序言", "epilogue": "尾声", } def get_connection(): return pymysql.connect(**DB_CONFIG) def list_structure(): conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT part_id, part_title, chapter_id, chapter_title, COUNT(*) as sections FROM chapters GROUP BY part_id, part_title, chapter_id, chapter_title ORDER BY part_id, chapter_id """) rows = cur.fetchall() print("篇章结构:") for part_id, part_title, ch_id, ch_title, cnt in rows: print(f" {part_id} ({part_title}) / {ch_id} ({ch_title}) - {cnt}节") cur.execute("SELECT COUNT(*) FROM chapters") total = cur.fetchone()[0] print(f"\n总计: {total} 节") conn.close() def generate_section_id(cur, chapter_id): """根据 chapter 编号自动生成下一个 section id""" ch_num = re.search(r"\d+", chapter_id) if not ch_num: cur.execute("SELECT MAX(CAST(REPLACE(id, '.', '') AS UNSIGNED)) FROM chapters") max_id = cur.fetchone()[0] or 0 return str(max_id + 1) prefix = ch_num.group() cur.execute( "SELECT id FROM chapters WHERE id LIKE %s ORDER BY CAST(SUBSTRING_INDEX(id, '.', -1) AS UNSIGNED) DESC LIMIT 1", (f"{prefix}.%",), ) row = cur.fetchone() if row: last_num = int(row[0].split(".")[-1]) return f"{prefix}.{last_num + 1}" return f"{prefix}.1" def upload_content(data): title = data.get("title", "").strip() if not title: print("错误: 标题不能为空") return False content = data.get("content", "").strip() if not content: print("错误: 内容不能为空") return False price = float(data.get("price", 1.0)) is_free = 1 if price == 0 else 0 part_id = data.get("part_id", "part-1") chapter_id = data.get("chapter_id", "chapter-1") fmt = data.get("format", "markdown") images = data.get("images", []) section_id = data.get("id", "") if images: for i, img_url in enumerate(images): placeholder = f"{{{{image_{i+1}}}}}" if placeholder in content: if fmt == "markdown": content = content.replace(placeholder, f"![图片{i+1}]({img_url})") else: content = content.replace(placeholder, img_url) word_count = len(re.sub(r"\s+", "", content)) part_title = PART_MAP.get(part_id, part_id) chapter_title = CHAPTER_MAP.get(chapter_id, chapter_id) conn = get_connection() cur = conn.cursor() if not section_id: section_id = generate_section_id(cur, chapter_id) cur.execute("SELECT mid FROM chapters WHERE id = %s", (section_id,)) existing = cur.fetchone() try: if existing: cur.execute(""" UPDATE chapters SET section_title = %s, content = %s, word_count = %s, is_free = %s, price = %s, part_id = %s, part_title = %s, chapter_id = %s, chapter_title = %s, status = 'published' WHERE id = %s """, (title, content, word_count, is_free, price, part_id, part_title, chapter_id, chapter_title, section_id)) action = "更新" else: cur.execute("SELECT COALESCE(MAX(sort_order), 0) + 1 FROM chapters") next_order = cur.fetchone()[0] cur.execute(""" INSERT INTO chapters (id, part_id, part_title, chapter_id, chapter_title, section_title, content, word_count, is_free, price, sort_order, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'published') """, (section_id, part_id, part_title, chapter_id, chapter_title, title, content, word_count, is_free, price, next_order)) action = "创建" conn.commit() result = { "success": True, "action": action, "data": { "id": section_id, "title": title, "part": f"{part_id} ({part_title})", "chapter": f"{chapter_id} ({chapter_title})", "price": price, "is_free": bool(is_free), "word_count": word_count, "format": fmt, "images_count": len(images), } } print(json.dumps(result, ensure_ascii=False, indent=2)) return True except pymysql.err.IntegrityError as e: print(json.dumps({"success": False, "error": f"ID冲突: {e}"}, ensure_ascii=False)) return False except Exception as e: conn.rollback() print(json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)) return False finally: conn.close() def main(): parser = argparse.ArgumentParser(description="Soul 内容上传接口") parser.add_argument("--json", help="JSON格式的完整数据") parser.add_argument("--title", help="标题") parser.add_argument("--price", type=float, default=1.0, help="定价(0=免费)") parser.add_argument("--content", help="内容正文") parser.add_argument("--content-file", help="从文件读取内容") parser.add_argument("--format", default="markdown", choices=["markdown", "text", "html"]) parser.add_argument("--part", default="part-1", help="所属篇 (part-1 ~ part-5)") parser.add_argument("--chapter", default="chapter-1", help="所属章 (chapter-1 ~ chapter-11)") parser.add_argument("--id", help="指定 section ID (如 1.6),不指定则自动生成") parser.add_argument("--images", nargs="*", help="图片URL列表") parser.add_argument("--list-structure", action="store_true", help="查看篇章结构") parser.add_argument("--list-chapters", action="store_true", help="列出所有章节") args = parser.parse_args() if args.list_structure: list_structure() return if args.list_chapters: conn = get_connection() cur = conn.cursor() cur.execute("SELECT id, section_title, is_free, price FROM chapters ORDER BY sort_order") for row in cur.fetchall(): free_tag = "[免费]" if row[2] else f"[¥{row[3]}]" print(f" {row[0]} {row[1]} {free_tag}") conn.close() return if args.json: data = json.loads(args.json) else: if not args.title or (not args.content and not args.content_file): parser.print_help() print("\n错误: 需要 --title 和 --content (或 --content-file)") sys.exit(1) content = args.content if args.content_file: with open(args.content_file, "r", encoding="utf-8") as f: content = f.read() data = { "title": args.title, "price": args.price, "content": content, "format": args.format, "part_id": args.part, "chapter_id": args.chapter, "images": args.images or [], } if args.id: data["id"] = args.id upload_content(data) if __name__ == "__main__": main()