Files
soul/content_upload.py

276 lines
9.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Soul 内容上传接口
可从 Cursor Skill / 命令行直接调用将新内容写入数据库
用法:
python3 content_upload.py --title "标题" --price 1.0 --content "正文" \
--part part-1 --chapter chapter-1 --format markdown
python3 content_upload.py --json '{
"title": "标题",
"price": 1.0,
"content": "正文内容...",
"part_id": "part-1",
"chapter_id": "chapter-1",
"format": "markdown",
"images": ["https://xxx.com/img1.png"]
}'
python3 content_upload.py --list-structure # 查看篇章结构
环境依赖: pip install pymysql
"""
import argparse
import json
import sys
import re
from datetime import datetime
try:
import pymysql
except ImportError:
print("需要安装 pymysql: pip3 install pymysql")
sys.exit(1)
DB_CONFIG = {
"host": "56b4c23f6853c.gz.cdb.myqcloud.com",
"port": 14413,
"user": "cdb_outerroot",
"password": "Zhiqun1984",
"database": "soul_miniprogram",
"charset": "utf8mb4",
}
PART_MAP = {
"part-1": "第一篇|真实的人",
"part-2": "第二篇|真实的行业",
"part-3": "第三篇|真实的错误",
"part-4": "第四篇|真实的赚钱",
"part-5": "第五篇|真实的社会",
"appendix": "附录",
"intro": "序言",
"outro": "尾声",
}
CHAPTER_MAP = {
"chapter-1": "第1章人与人之间的底层逻辑",
"chapter-2": "第2章人性困境案例",
"chapter-3": "第3章电商篇",
"chapter-4": "第4章内容商业篇",
"chapter-5": "第5章传统行业篇",
"chapter-6": "第6章我人生错过的4件大钱",
"chapter-7": "第7章别人犯的错误",
"chapter-8": "第8章底层结构",
"chapter-9": "第9章我在Soul上亲访的赚钱案例",
"chapter-10": "第10章未来职业的变化趋势",
"chapter-11": "第11章中国社会商业生态的未来",
"appendix": "附录",
"preface": "序言",
"epilogue": "尾声",
}
def get_connection():
return pymysql.connect(**DB_CONFIG)
def list_structure():
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT part_id, part_title, chapter_id, chapter_title, COUNT(*) as sections
FROM chapters
GROUP BY part_id, part_title, chapter_id, chapter_title
ORDER BY part_id, chapter_id
""")
rows = cur.fetchall()
print("篇章结构:")
for part_id, part_title, ch_id, ch_title, cnt in rows:
print(f" {part_id} ({part_title}) / {ch_id} ({ch_title}) - {cnt}")
cur.execute("SELECT COUNT(*) FROM chapters")
total = cur.fetchone()[0]
print(f"\n总计: {total}")
conn.close()
def generate_section_id(cur, chapter_id):
"""根据 chapter 编号自动生成下一个 section id"""
ch_num = re.search(r"\d+", chapter_id)
if not ch_num:
cur.execute("SELECT MAX(CAST(REPLACE(id, '.', '') AS UNSIGNED)) FROM chapters")
max_id = cur.fetchone()[0] or 0
return str(max_id + 1)
prefix = ch_num.group()
cur.execute(
"SELECT id FROM chapters WHERE id LIKE %s ORDER BY CAST(SUBSTRING_INDEX(id, '.', -1) AS UNSIGNED) DESC LIMIT 1",
(f"{prefix}.%",),
)
row = cur.fetchone()
if row:
last_num = int(row[0].split(".")[-1])
return f"{prefix}.{last_num + 1}"
return f"{prefix}.1"
def upload_content(data):
title = data.get("title", "").strip()
if not title:
print("错误: 标题不能为空")
return False
content = data.get("content", "").strip()
if not content:
print("错误: 内容不能为空")
return False
price = float(data.get("price", 1.0))
is_free = 1 if price == 0 else 0
part_id = data.get("part_id", "part-1")
chapter_id = data.get("chapter_id", "chapter-1")
fmt = data.get("format", "markdown")
images = data.get("images", [])
section_id = data.get("id", "")
if images:
for i, img_url in enumerate(images):
placeholder = f"{{{{image_{i+1}}}}}"
if placeholder in content:
if fmt == "markdown":
content = content.replace(placeholder, f"![图片{i+1}]({img_url})")
else:
content = content.replace(placeholder, img_url)
word_count = len(re.sub(r"\s+", "", content))
part_title = PART_MAP.get(part_id, part_id)
chapter_title = CHAPTER_MAP.get(chapter_id, chapter_id)
conn = get_connection()
cur = conn.cursor()
if not section_id:
section_id = generate_section_id(cur, chapter_id)
cur.execute("SELECT mid FROM chapters WHERE id = %s", (section_id,))
existing = cur.fetchone()
try:
if existing:
cur.execute("""
UPDATE chapters SET
section_title = %s, content = %s, word_count = %s,
is_free = %s, price = %s, part_id = %s, part_title = %s,
chapter_id = %s, chapter_title = %s, status = 'published'
WHERE id = %s
""", (title, content, word_count, is_free, price, part_id, part_title,
chapter_id, chapter_title, section_id))
action = "更新"
else:
cur.execute("SELECT COALESCE(MAX(sort_order), 0) + 1 FROM chapters")
next_order = cur.fetchone()[0]
cur.execute("""
INSERT INTO chapters (id, part_id, part_title, chapter_id, chapter_title,
section_title, content, word_count, is_free, price, sort_order, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'published')
""", (section_id, part_id, part_title, chapter_id, chapter_title,
title, content, word_count, is_free, price, next_order))
action = "创建"
conn.commit()
result = {
"success": True,
"action": action,
"data": {
"id": section_id,
"title": title,
"part": f"{part_id} ({part_title})",
"chapter": f"{chapter_id} ({chapter_title})",
"price": price,
"is_free": bool(is_free),
"word_count": word_count,
"format": fmt,
"images_count": len(images),
}
}
print(json.dumps(result, ensure_ascii=False, indent=2))
return True
except pymysql.err.IntegrityError as e:
print(json.dumps({"success": False, "error": f"ID冲突: {e}"}, ensure_ascii=False))
return False
except Exception as e:
conn.rollback()
print(json.dumps({"success": False, "error": str(e)}, ensure_ascii=False))
return False
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description="Soul 内容上传接口")
parser.add_argument("--json", help="JSON格式的完整数据")
parser.add_argument("--title", help="标题")
parser.add_argument("--price", type=float, default=1.0, help="定价(0=免费)")
parser.add_argument("--content", help="内容正文")
parser.add_argument("--content-file", help="从文件读取内容")
parser.add_argument("--format", default="markdown", choices=["markdown", "text", "html"])
parser.add_argument("--part", default="part-1", help="所属篇 (part-1 ~ part-5)")
parser.add_argument("--chapter", default="chapter-1", help="所属章 (chapter-1 ~ chapter-11)")
parser.add_argument("--id", help="指定 section ID (如 1.6),不指定则自动生成")
parser.add_argument("--images", nargs="*", help="图片URL列表")
parser.add_argument("--list-structure", action="store_true", help="查看篇章结构")
parser.add_argument("--list-chapters", action="store_true", help="列出所有章节")
args = parser.parse_args()
if args.list_structure:
list_structure()
return
if args.list_chapters:
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT id, section_title, is_free, price FROM chapters ORDER BY sort_order")
for row in cur.fetchall():
free_tag = "[免费]" if row[2] else f"{row[3]}]"
print(f" {row[0]} {row[1]} {free_tag}")
conn.close()
return
if args.json:
data = json.loads(args.json)
else:
if not args.title or (not args.content and not args.content_file):
parser.print_help()
print("\n错误: 需要 --title 和 --content (或 --content-file)")
sys.exit(1)
content = args.content
if args.content_file:
with open(args.content_file, "r", encoding="utf-8") as f:
content = f.read()
data = {
"title": args.title,
"price": args.price,
"content": content,
"format": args.format,
"part_id": args.part,
"chapter_id": args.chapter,
"images": args.images or [],
}
if args.id:
data["id"] = args.id
upload_content(data)
if __name__ == "__main__":
main()