feat: 小程序超级个体/个人资料/CKB获客;VIP列表展示过滤;管理端与API联调

- 超级个体:去掉首位特例;列表仅展示有头像且非微信默认昵称(vip.go)
- 个人资料:居中头像、低调联系方式、点头像优先走存客宝 lead(ckbLeadToken)
- 阅读页分享朋友圈复制与 toast 去重
- soul-api: miniprogram users 带 ckbLeadToken;其它 handler 与路由调整
- 脚本:content_upload、miniprogram 上传辅助等

Made-with: Cursor
This commit is contained in:
卡若
2026-03-22 08:34:28 +08:00
parent 17ce20c8ee
commit 5724fba877
119 changed files with 8198 additions and 4369 deletions

View File

@@ -0,0 +1,48 @@
# Soul 运营全链路技能包(本机一键打包)
## 你要做的事(复制到另一台电脑前)
**本机终端** 执行(路径按你实际安装调整):
```bash
python3 "/Users/karuo/Documents/开发/3、自营项目/一场soul的创业实验-永平/scripts/pack_soul_operation_skills.py"
```
- 会在 **`~/Downloads/Soul运营全链路技能包_20260320.zip`** 生成压缩包(日期戳见脚本内 `STAMP`,可自行改)。
- 临时文件在永平项目下 **`.tmp_skill_bundle/`**,打包完成后可整目录删除。
若卡若AI不在默认路径请先编辑 `pack_soul_operation_skills.py` 里的:
```python
KARUO_AI = Path("/Users/karuo/Documents/个人/卡若AI")
```
## 压缩包里有什么(保证链路齐全)
| 内容 | 说明 |
|:---|:---|
| `.cursor/skills/soul-operation-report` | Cursor 入口:运营报表 |
| `.cursor/skills/soul-party-project` | Cursor 入口:水岸项目管理 |
| `卡若AI/02_卡人/水岸_项目管理/` | 水岸总纲 + 卡若创业派对 README |
| `卡若AI/.../水桥_平台对接/飞书管理/` | 运营报表、妙记相关脚本与 SKILL |
| `卡若AI/.../水桥_平台对接/智能纪要/` | 妙记下载、纪要 SKILL + 脚本 |
| `卡若AI/.../水桥_平台对接/Soul创业实验/` | 写作/上传/环境与 TOKEN 说明 |
| `卡若AI/03_卡木/木叶_视频内容/` 下 | `视频切片``多平台分发``抖音/B站/视频号/小红书/快手发布` |
| `卡若AI/运营中枢/工作台/00_账号与API索引.md` | 若本机存在则一并打入(凭证速查) |
| `解压后必读.md` | 在另一台电脑上的合并步骤与环境说明 |
## 「可直接运作」在另一台机上的含义
- **Skill 与脚本文件**会齐;但要真正跑通,仍需在新电脑上:
- 安装 **Python、依赖、FFmpeg、conda/mlx-whisper**(见各 SKILL
- 配置 **飞书/妙记/各平台 Cookie、小程序与永平项目 `.env`**(见 `Soul创业实验/上传/环境与TOKEN配置.md``00_账号与API索引.md`
- 把文档里原机的 **`/Users/karuo/...`** 改成新机器路径。
## 安全
压缩包可能含 **密钥与 Cookie 说明**,请勿上传公开网盘;用 U 盘或加密渠道传输。
## 未打入的内容(属正常)
- **`飞书管理/脚本/.browser_state/`**Playwright/Chrome 本地状态,常含断链或套接字文件,打包会失败且不宜迁移;**到新电脑需按各脚本说明重新登录/生成状态**。
- 体积约 **260MB+**(含脚本与分发相关文件);若需更小体积,可自行从包内删掉用不到的平台目录后再压缩。

114
scripts/content_download.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从小程序/正式 API 下载单章正文,保存为书稿目录下的 md 文件。
用法:
SOUL_TEST_ENV=soulapi python3 scripts/content_download.py 128
SOUL_TEST_ENV=soulapi python3 scripts/content_download.py --id 10.27
python3 scripts/content_download.py 128 --out-dir /path/to/2026每日派对干货
2026 场次第102场起对应 id 10.01、10.02、…、10.27第128场
"""
import argparse
import os
import sys
from pathlib import Path
# 项目根
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "scripts" / "test"))
try:
from config import API_BASE, ENV_LABEL
except Exception:
API_BASE = os.environ.get("SOUL_API_BASE", "https://soulapi.quwanzhi.com").rstrip("/")
ENV_LABEL = "env"
# 书稿 2026 目录默认路径(与上传 README 一致)
DEFAULT_BOOK_2026 = Path(
os.environ.get(
"SOUL_BOOK_2026",
"/Users/karuo/Documents/个人/2、我写的书/《一场soul的创业实验》/2026每日派对干货",
)
)
def field_to_id(field: int) -> str:
"""第 N 场≥102→ 10.xx第101场及以前在第九章为 9.xx。API 上第128场可能为 9.28。"""
if field >= 102:
n = field - 102 + 1
return f"10.{n:02d}"
if field >= 1:
return f"9.{field:02d}" # 第9章 9.019.99,按场次
raise ValueError("场次请用 1999")
def main():
parser = argparse.ArgumentParser(description="从小程序 API 下载单章为 md")
parser.add_argument("field", nargs="?", type=int, help="场次号,如 128 表示第128场")
parser.add_argument("--id", type=str, help="章节 id如 10.27(与 field 二选一)")
parser.add_argument("--out-dir", type=Path, default=DEFAULT_BOOK_2026, help="输出目录,默认 2026每日派对干货")
parser.add_argument("--base", type=str, default=API_BASE, help="API 根地址")
args = parser.parse_args()
if args.id:
chapter_id = args.id
elif args.field is not None:
chapter_id = field_to_id(args.field)
else:
parser.error("请指定 field如 128或 --id如 10.27")
base = args.base.rstrip("/")
try:
import requests
except ImportError:
print("请安装: pip install requests", file=sys.stderr)
sys.exit(1)
url = f"{base}/api/miniprogram/book/chapter/by-id/{chapter_id}"
print(f"环境: {ENV_LABEL} | GET {url}")
r = requests.get(url, timeout=30)
# 2026 场次可能仍挂在第9章9.28=第128场404 时用 9.xx 再试
if r.status_code == 404 and args.field is not None and not args.id and args.field >= 1 and args.field <= 101:
fallback_id = f"9.{args.field:02d}"
url = f"{base}/api/miniprogram/book/chapter/by-id/{fallback_id}"
print(f"404尝试第9章 id: {fallback_id} | GET {url}")
r = requests.get(url, timeout=30)
if r.status_code == 200:
chapter_id = fallback_id
if r.status_code == 404 and args.field is not None and not args.id and args.field >= 102:
fallback_id = f"9.{(args.field - 100):02d}" # 128 → 9.28
url = f"{base}/api/miniprogram/book/chapter/by-id/{fallback_id}"
print(f"404尝试第9章 id: {fallback_id} | GET {url}")
r = requests.get(url, timeout=30)
if r.status_code == 200:
chapter_id = fallback_id
r.raise_for_status()
data = r.json()
if not data.get("success"):
print("API 返回失败:", data.get("error", data), file=sys.stderr)
sys.exit(2)
content = data.get("content") or (data.get("data") or {}).get("content") or ""
section_title = data.get("sectionTitle") or (data.get("data") or {}).get("sectionTitle") or f"{args.field or '?'}"
if not content:
print("未获取到正文 content", file=sys.stderr)
sys.exit(3)
# 若标题里没有「第X场」用场次补上sectionTitle 可能是「赚最多那个月…」)
if args.field and "" not in section_title and "" not in section_title:
display_title = f"{args.field}场|{section_title}"
else:
display_title = section_title
out_dir = args.out_dir
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"{display_title}.md"
body = f"# {display_title}\n\n{content.strip()}\n"
out_file.write_text(body, encoding="utf-8")
print(f"已写入: {out_file}")
print(f"字数: {len(content)}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,13 @@
{
"item_list": [
{
"address": "pages/index/index",
"tag": "阅读 创业",
"first_class": "一级类目名称",
"second_class": "二级类目名称",
"first_id": 0,
"second_id": 0,
"title": "首页"
}
]
}

39
scripts/miniprogram_upload.sh Executable file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# Soul 小程序:通过微信开发者工具 CLI 上传代码包(需本机已登录开发者工具)。
set -euo pipefail
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
MINIPROGRAM_DIR="${MINIPROGRAM_DIR:-$ROOT/miniprogram}"
CLI="${WECHAT_DEVTOOLS_CLI:-/Applications/wechatwebdevtools.app/Contents/MacOS/cli}"
LANG_OPT="${WECHAT_CLI_LANG:-zh}"
if [[ ! -x "$CLI" ]]; then
echo "未找到微信开发者工具 CLI: $CLI" >&2
echo "可设置 WECHAT_DEVTOOLS_CLI 指向 cli 可执行文件。" >&2
exit 1
fi
# 未传参时默认 1.7.1(避免手滑打成 1.17 等与展示不一致)
DEFAULT_VER="${MINIPROGRAM_DEFAULT_VERSION:-1.7.1}"
VERSION="${1:-$DEFAULT_VER}"
DESC="${2:-版本 v$VERSION}"
CLI_EXTRA=()
if [[ -n "${WECHAT_CLI_PORT:-}" ]]; then
CLI_EXTRA+=(--port "$WECHAT_CLI_PORT")
fi
if ((${#CLI_EXTRA[@]} > 0)); then
exec "$CLI" upload \
--project "$MINIPROGRAM_DIR" \
--version "$VERSION" \
--desc "$DESC" \
--lang "$LANG_OPT" \
"${CLI_EXTRA[@]}"
else
exec "$CLI" upload \
--project "$MINIPROGRAM_DIR" \
--version "$VERSION" \
--desc "$DESC" \
--lang "$LANG_OPT"
fi

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Soul 运营全链路技能包:将 SKILL + 脚本 + Cursor 入口打成 zip默认输出到用户「下载」文件夹。
用法:
python3 scripts/pack_soul_operation_skills.py
"""
from __future__ import annotations
import shutil
import sys
import zipfile
from pathlib import Path
STAMP = "20260320"
BUNDLE_TOP = f"Soul运营全链路技能包_{STAMP}"
# 卡若AI 根目录(按你本机实际修改)
KARUO_AI = Path("/Users/karuo/Documents/个人/卡若AI")
CURSOR_SKILLS = Path.home() / ".cursor" / "skills"
DOWNLOADS = Path.home() / "Downloads"
# 在永平项目下临时组装(本仓库内,便于工具写入)
REPO_ROOT = Path(__file__).resolve().parents[1]
STAGING_PARENT = REPO_ROOT / ".tmp_skill_bundle"
STAGING = STAGING_PARENT / BUNDLE_TOP
def ignore_copy(dirpath: str, names: list[str]) -> list[str]:
"""排除缓存、浏览器运行时目录(含断链/套接字,会导致 copytree 失败)。"""
skip_dirs = {"__pycache__", ".browser_state", "chromium_data"}
skip_files = {".DS_Store"}
ignored: list[str] = []
for n in names:
if n in skip_dirs or n in skip_files or n.endswith(".pyc"):
ignored.append(n)
return ignored
def copytree(src: Path, dst: Path) -> None:
if not src.exists():
print(f"SKIP 不存在: {src}", file=sys.stderr)
return
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src, dst, dirs_exist_ok=True, ignore=ignore_copy)
def main() -> int:
if not KARUO_AI.is_dir():
print(f"ERROR: 未找到卡若AI目录: {KARUO_AI}", file=sys.stderr)
return 1
if STAGING.exists():
shutil.rmtree(STAGING)
STAGING.mkdir(parents=True)
# Cursor 入口
csk = STAGING / ".cursor" / "skills"
csk.mkdir(parents=True, exist_ok=True)
for name in ("soul-operation-report", "soul-party-project"):
p = CURSOR_SKILLS / name
if p.is_dir():
copytree(p, csk / name)
kai = STAGING / "卡若AI"
copytree(
KARUO_AI / "02_卡人" / "水岸_项目管理",
kai / "02_卡人" / "水岸_项目管理",
)
bridge = KARUO_AI / "02_卡人" / "水桥_平台对接"
for sub in ("飞书管理", "智能纪要", "Soul创业实验"):
copytree(bridge / sub, kai / "02_卡人" / "水桥_平台对接" / sub)
wood = KARUO_AI / "03_卡木" / "木叶_视频内容"
wdst = kai / "03_卡木" / "木叶_视频内容"
for sub in (
"视频切片",
"多平台分发",
"抖音发布",
"B站发布",
"视频号发布",
"小红书发布",
"快手发布",
):
copytree(wood / sub, wdst / sub)
idx = KARUO_AI / "运营中枢" / "工作台" / "00_账号与API索引.md"
if idx.is_file():
(kai / "运营中枢" / "工作台").mkdir(parents=True, exist_ok=True)
shutil.copy2(idx, kai / "运营中枢" / "工作台" / idx.name)
readme = STAGING / "解压后必读.md"
readme.write_text(
"""# Soul 运营全链路技能包
## 包含内容
- `.cursor/skills/``soul-operation-report`、`soul-party-project`
- `卡若AI/02_卡人/水岸_项目管理/`
- `卡若AI/02_卡人/水桥_平台对接/飞书管理/`、`智能纪要/`、`Soul创业实验/`
- `卡若AI/03_卡木/木叶_视频内容/`:视频切片、多平台分发、各平台发布
- `卡若AI/运营中枢/工作台/00_账号与API索引.md`(若源机存在)
## 另一台电脑怎么用
1. 解压后,将 `卡若AI/` **合并**到你本机卡若AI根目录先备份
2. 将 `.cursor/skills/` 下两个目录复制到 `~/.cursor/skills/`。
3. 安装 Python/FFmpeg/conda 等依赖,按各 SKILL 与 `Soul创业实验/上传/环境与TOKEN配置.md` 配置 Token、Cookie、永平项目 `.env`。
4. 文档或脚本里的 `/Users/karuo/...` 需改成本机路径。
**安全**:包内可能有凭证说明,勿上传公开网盘。
""",
encoding="utf-8",
)
for p in list(STAGING.rglob("__pycache__")):
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
for p in STAGING.rglob("*.pyc"):
try:
p.unlink()
except OSError:
pass
DOWNLOADS.mkdir(parents=True, exist_ok=True)
zip_path = DOWNLOADS / f"{BUNDLE_TOP}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for f in STAGING.rglob("*"):
if f.is_file():
arcname = Path(BUNDLE_TOP) / f.relative_to(STAGING)
zf.write(f, arcname.as_posix())
mb = zip_path.stat().st_size / (1024 * 1024)
print(f"完成: {zip_path} ({mb:.2f} MB)")
print(f"临时目录(可删): {STAGING}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从书稿 Markdown 中仅解析「图片」引用并上传到现网,再生成 HTML 写入 chapters.content。
- 只处理:`![](相对/绝对路径.png)` 等常见图片后缀;**不解析、不上传视频/附件**。
- 已是 `http(s)://` 的地址:不重复上传,原样写入 `<img src="...">`。
- 非图片后缀的 `![]()`:当作普通正文一行输出(不尝试上传)。
用法:
cd 一场soul的创业实验-永平
python3 scripts/sync_chapter_images_from_md.py --id 10.22 \\
--md "/path/to/第130场….md"
依赖: pip install pymysql requests
环境变量: SOUL_API_BASE 默认 https://soulapi.quwanzhi.com
"""
from __future__ import annotations
import argparse
import html
import importlib.util
import os
import re
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".webp"})
# 整行仅有一张图:![](...)
LINE_IMAGE_ONLY = re.compile(r"^\s*!\[([^\]]*)\]\(([^)]+)\)\s*$")
try:
import pymysql
import requests
except ImportError as e:
print("需要: pip install pymysql requests", e, file=sys.stderr)
sys.exit(1)
def load_db_config() -> dict:
mig = ROOT / "scripts" / "migrate_2026_sections.py"
spec = importlib.util.spec_from_file_location("_mig_db", mig)
mod = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(mod)
cfg = getattr(mod, "DB_CONFIG", None)
if not isinstance(cfg, dict):
sys.exit("migrate_2026_sections.py 中无有效 DB_CONFIG")
return cfg
def resolve_local_path(md_path: Path, ref: str) -> Path | None:
ref = ref.strip()
if not ref or ref.startswith(("http://", "https://")):
return None
p = (md_path.parent / ref).expanduser().resolve()
if p.is_file():
return p
return None
def guess_mime(path: Path) -> str:
ext = path.suffix.lower()
return {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
}.get(ext, "application/octet-stream")
def upload_image(local: Path, api_base: str) -> str:
url = f"{api_base.rstrip('/')}/api/upload"
mime = guess_mime(local)
with local.open("rb") as f:
files = {"file": (local.name, f, mime)}
data = {"folder": "book-images"}
r = requests.post(url, files=files, data=data, timeout=120)
r.raise_for_status()
j = r.json()
if not j.get("success"):
raise RuntimeError(j.get("error") or j.get("message") or str(j))
out = j.get("url") or (j.get("data") or {}).get("url")
if not out:
raise RuntimeError("响应无 url: " + str(j)[:500])
return str(out)
def md_to_html(md_path: Path, api_base: str) -> str:
raw = md_path.read_text(encoding="utf-8")
lines = raw.splitlines()
if lines and lines[0].lstrip().startswith("#"):
lines = lines[1:]
chunks: list[str] = []
upload_cache: dict[str, str] = {}
for line in lines:
if line.strip() == "---":
chunks.append("")
continue
m = LINE_IMAGE_ONLY.match(line)
if m:
alt, ref = m.group(1), m.group(2).strip()
if ref.startswith(("http://", "https://")):
chunks.append(
f'<p><img src="{html.escape(ref)}" alt="{html.escape(alt)}"/></p>'
)
continue
loc = resolve_local_path(md_path, ref)
if not loc:
chunks.append(f"<p>(图片路径无效:{html.escape(ref)}</p>")
continue
ext = loc.suffix.lower()
if ext not in IMAGE_EXT:
# 非图片(如视频):不解析、不上传,整行当正文
chunks.append(f"<p>{html.escape(line.strip())}</p>")
continue
key = str(loc)
if key not in upload_cache:
print(f"上传图片: {loc.name}", flush=True)
upload_cache[key] = upload_image(loc, api_base)
src = upload_cache[key]
chunks.append(
f'<p><img src="{html.escape(src)}" alt="{html.escape(alt)}"/></p>'
)
continue
if line.strip():
chunks.append(f"<p>{html.escape(line.strip())}</p>")
else:
chunks.append("")
# 合并连续空串为单个换行,避免多余空 <p>
html_parts: list[str] = []
for c in chunks:
if c == "":
if html_parts and html_parts[-1] != "":
html_parts.append("")
else:
html_parts.append(c)
return "\n".join(html_parts).strip() + "\n"
def main() -> None:
p = argparse.ArgumentParser(description="MD 内图片上传并写回 chapters仅图片")
p.add_argument("--id", required=True, help="章节 id如 10.22")
p.add_argument("--md", type=Path, required=True, help="文章 .md 路径")
p.add_argument(
"--api-base",
default=os.environ.get("SOUL_API_BASE", "https://soulapi.quwanzhi.com"),
help="API 根地址",
)
p.add_argument("--dry-run", action="store_true", help="只打印 HTML 前 800 字,不写库")
args = p.parse_args()
md_path = args.md.expanduser().resolve()
if not md_path.is_file():
sys.exit(f"文件不存在: {md_path}")
body = md_to_html(md_path, args.api_base)
word_count = len(body)
if args.dry_run:
print(body[:800])
print("… dry-runword_count=", word_count)
return
cfg = load_db_config()
conn = pymysql.connect(**cfg)
cur = conn.cursor()
cur.execute(
"UPDATE chapters SET content = %s, word_count = %s, updated_at = NOW() WHERE id = %s",
(body, word_count, args.id),
)
if cur.rowcount != 1:
conn.rollback()
sys.exit(f"更新失败id={args.id} rowcount={cur.rowcount}")
conn.commit()
conn.close()
print(f"已更新 {args.id} | word_count={word_count}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,368 @@
#!/usr/bin/env python3
"""
微信小程序发布辅助:上传(调开发者工具 CLI+ 审核状态查询 + 尝试 API 提审。
重要说明(微信官方限制):
- 代码上传可用本机「微信开发者工具」CLI 或 miniprogram-ci。
- submit_audit开放平台文档标明主要为「第三方平台代调用」自有主体使用小程序 appid+secret
换取的 access_token 调用时,常见返回 errcode=86000仅允许第三方代调用此时必须在
mp 后台手动点「提交审核」。
- 「自动过审」不可能由开发者脚本保证:是否通过由微信审核决定。
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
# 登录 mp 后进入「版本管理 / 开发版本」列表(选体验版、提交审核均在此页操作;具体路由以微信后台为准)。
MP_VERSION_MANAGE_URL = (
"https://mp.weixin.qq.com/wxopen/wacodepage?action=getcodepage&lang=zh_CN"
)
def _repo_root() -> Path:
return Path(__file__).resolve().parent.parent
def _load_partial_env_file(path: Path, keys: tuple[str, ...]) -> None:
if not path.is_file():
return
for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
k, v = k.strip(), v.strip().strip('"').strip("'")
if k in keys and not os.environ.get(k):
os.environ[k] = v
def ensure_wechat_env_from_soul_api() -> None:
"""若未 export 凭证,则从 soul-api/.env.production 或 .env 读取 WECHAT_APPID/SECRET不写日志"""
if os.environ.get("WECHAT_APPID") and os.environ.get("WECHAT_APPSECRET"):
return
root = _repo_root()
for name in (".env.production", ".env"):
_load_partial_env_file(
root / "soul-api" / name, ("WECHAT_APPID", "WECHAT_APPSECRET")
)
if os.environ.get("WECHAT_APPID") and os.environ.get("WECHAT_APPSECRET"):
return
def _get(url: str) -> dict:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read().decode())
def _post_json(url: str, body: dict) -> dict:
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={"Content-Type": "application/json; charset=utf-8"},
)
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read().decode())
def get_access_token(appid: str, secret: str) -> str:
q = urllib.parse.urlencode(
{"grant_type": "client_credential", "appid": appid, "secret": secret}
)
url = f"https://api.weixin.qq.com/cgi-bin/token?{q}"
try:
data = _get(url)
except urllib.error.HTTPError as e:
raise SystemExit(f"获取 access_token HTTP 错误: {e}") from e
if data.get("errcode"):
raise SystemExit(f"获取 access_token 失败: {data}")
token = data.get("access_token")
if not token:
raise SystemExit(f"获取 access_token 无 access_token 字段: {data}")
return token
def cmd_audit_status(appid: str, secret: str) -> None:
token = get_access_token(appid, secret)
url = f"https://api.weixin.qq.com/wxa/get_latest_auditstatus?access_token={urllib.parse.quote(token)}"
try:
data = _get(url)
except urllib.error.HTTPError as e:
raise SystemExit(f"get_latest_auditstatus HTTP 错误: {e}") from e
print(json.dumps(data, ensure_ascii=False, indent=2))
# 常见 status: 0 审核成功 1 审核被拒绝 2 审核中 3 已撤回 4 审核延后
def cmd_get_category(appid: str, secret: str) -> None:
token = get_access_token(appid, secret)
url = f"https://api.weixin.qq.com/wxa/get_category?access_token={urllib.parse.quote(token)}"
try:
data = _get(url)
except urllib.error.HTTPError as e:
raise SystemExit(f"get_category HTTP 错误: {e}") from e
print(json.dumps(data, ensure_ascii=False, indent=2))
def _first_item_from_category(data: dict) -> dict | None:
lst = data.get("category_list")
if not lst or not isinstance(lst, list):
return None
c = lst[0]
if not isinstance(c, dict):
return None
first_class = c.get("first_class") or ""
second_class = c.get("second_class") or ""
first_id = c.get("first_id")
second_id = c.get("second_id")
if first_id is None or second_id is None:
return None
item: dict = {
"address": "pages/index/index",
"tag": "阅读 创业",
"first_class": first_class,
"second_class": second_class,
"first_id": int(first_id),
"second_id": int(second_id),
"title": "首页",
}
third_class = c.get("third_class")
third_id = c.get("third_id")
if third_class and third_id:
item["third_class"] = third_class
item["third_id"] = int(third_id)
return item
def cmd_submit_audit(
appid: str,
secret: str,
version_desc: str,
item_json: Path | None,
privacy_api_not_use: bool | None,
) -> dict:
token = get_access_token(appid, secret)
if item_json and item_json.is_file():
payload = json.loads(item_json.read_text(encoding="utf-8"))
item_list = payload.get("item_list")
if not item_list:
raise SystemExit("item_json 中缺少 item_list")
else:
cat_url = f"https://api.weixin.qq.com/wxa/get_category?access_token={urllib.parse.quote(token)}"
cat = _get(cat_url)
if cat.get("errcode"):
raise SystemExit(f"get_category 失败: {cat}")
one = _first_item_from_category(cat)
if not one:
raise SystemExit(
"无法从 get_category 构造审核项;请在公众平台配置服务类目,"
"或使用 --item-json 指定完整 item_list见 scripts/miniprogram_audit_item.example.json"
)
item_list = [one]
body: dict = {
"item_list": item_list,
"version_desc": version_desc[:400] if version_desc else "版本更新",
}
if privacy_api_not_use is True:
body["privacy_api_not_use"] = True
elif privacy_api_not_use is False:
body["privacy_api_not_use"] = False
submit_url = f"https://api.weixin.qq.com/wxa/submit_audit?access_token={urllib.parse.quote(token)}"
try:
data = _post_json(submit_url, body)
except urllib.error.HTTPError as e:
raise SystemExit(f"submit_audit HTTP 错误: {e}") from e
print(json.dumps(data, ensure_ascii=False, indent=2))
if data.get("errcode") == 86000:
print(
"\n说明 errcode=86000该接口仅支持「第三方平台」代小程序调用。"
"自有主体请在浏览器打开公众平台 → 管理 → 版本管理 → 提交审核。\n"
"可先运行: python3 scripts/wechat_miniprogram_release.py open-mp",
file=sys.stderr,
)
elif data.get("errcode") == 61039:
print(
"\n说明 errcode=61039上传后隐私/代码检测任务未完成,请等待数分钟后再提交审核。",
file=sys.stderr,
)
return data
def cmd_upload(version: str, desc: str) -> None:
root = Path(__file__).resolve().parent.parent
sh = root / "scripts" / "miniprogram_upload.sh"
if not sh.is_file():
raise SystemExit(f"未找到 {sh}")
r = subprocess.run([str(sh), version, desc], check=False)
if r.returncode != 0:
raise SystemExit(r.returncode)
def cmd_open_mp() -> None:
url = "https://mp.weixin.qq.com/"
try:
subprocess.run(["open", url], check=False)
except FileNotFoundError:
print(url)
def cmd_open_mp_version() -> None:
"""打开版本管理(开发版本列表,可设体验版、提交审核)。"""
try:
subprocess.run(["open", MP_VERSION_MANAGE_URL], check=False)
except FileNotFoundError:
print(MP_VERSION_MANAGE_URL)
def cmd_upload_open(version: str, desc: str) -> None:
cmd_upload(version, desc)
print(
"\n下一步微信未开放「一键设为体验版」API需在网页上点两次\n"
"1在「开发版本」列表找到刚上传的版本号\n"
"2点击「选为体验版」\n"
"3同一列表对该版本点击「提交审核」或先体验再提审\n",
file=sys.stderr,
)
cmd_open_mp_version()
def main() -> None:
p = argparse.ArgumentParser(description="微信小程序上传与审核辅助")
sub = p.add_subparsers(dest="cmd", required=True)
p_up = sub.add_parser("upload", help="调用微信开发者工具 CLI 上传(需本机已登录)")
p_up.add_argument(
"--version",
"-v",
default=os.environ.get("MINIPROGRAM_DEFAULT_VERSION", "1.7.1"),
help="版本号,默认 1.7.1 或环境变量 MINIPROGRAM_DEFAULT_VERSION",
)
p_up.add_argument(
"--desc",
"-d",
default="",
help="版本说明,默认同「版本 v<版本号>」",
)
p_st = sub.add_parser("audit-status", help="查询最近一次审核状态(需 WECHAT_APPID/SECRET")
p_cat = sub.add_parser("get-category", help="拉取已配置服务类目 JSON构造提审项用")
p_sa = sub.add_parser(
"submit-audit",
help="尝试调用 submit_audit自有主体常会 86000需改 mp 后台手动提审)",
)
p_sa.add_argument("--version-desc", default="版本更新", help="审核说明 version_desc")
p_sa.add_argument(
"--item-json",
type=Path,
help="自定义 item_list 的 JSON 文件(含 item_list 数组)",
)
p_sa.add_argument(
"--privacy-api-not-use",
action=argparse.BooleanOptionalAction,
default=None,
help="是否声明未使用检测到的隐私接口(与微信报错 61040 相关时再用)",
)
sub.add_parser("open-mp", help="在浏览器打开 mp 首页")
sub.add_parser(
"open-version",
help="打开版本管理页(开发版本:选体验版、提交审核;需已登录 mp",
)
p_uo = sub.add_parser(
"upload-open",
help="上传代码并打开版本管理页(设体验版、提审在浏览器完成)",
)
p_uo.add_argument(
"--version",
"-v",
default=os.environ.get("MINIPROGRAM_DEFAULT_VERSION", "1.7.1"),
)
p_uo.add_argument("--desc", "-d", default="", help="默认:版本 v<版本号>")
p_rel = sub.add_parser("release", help="先 upload 再 submit-audit提审失败仍可到后台操作")
p_rel.add_argument(
"--version",
"-v",
default=os.environ.get("MINIPROGRAM_DEFAULT_VERSION", "1.7.1"),
)
p_rel.add_argument("--desc", "-d", default="", help="上传说明,默认:版本 v<版本号>")
p_rel.add_argument("--version-desc", default="", help="提交审核说明,默认同上传说明")
p_rel.add_argument("--item-json", type=Path)
p_rel.add_argument(
"--privacy-api-not-use",
action=argparse.BooleanOptionalAction,
default=None,
)
args = p.parse_args()
if args.cmd in (
"audit-status",
"get-category",
"submit-audit",
"release",
):
ensure_wechat_env_from_soul_api()
appid = os.environ.get("WECHAT_APPID", "").strip()
secret = os.environ.get("WECHAT_APPSECRET", "").strip()
if args.cmd == "upload":
desc = args.desc.strip() or f"版本 v{args.version}"
cmd_upload(args.version, desc)
return
if args.cmd == "open-mp":
cmd_open_mp()
return
if args.cmd == "open-version":
cmd_open_mp_version()
return
if args.cmd == "upload-open":
d = args.desc.strip() or f"版本 v{args.version}"
cmd_upload_open(args.version, d)
return
if args.cmd == "release":
d = args.desc.strip() or f"版本 v{args.version}"
cmd_upload(args.version, d)
if not appid or not secret:
print(
"未设置 WECHAT_APPID / WECHAT_APPSECRET跳过 submit-audit。",
file=sys.stderr,
)
cmd_open_mp_version()
return
vd = (args.version_desc or "").strip() or d
res = cmd_submit_audit(
appid, secret, vd, args.item_json, args.privacy_api_not_use
)
if res.get("errcode") == 86000:
cmd_open_mp_version()
return
if not appid or not secret:
raise SystemExit("请设置环境变量 WECHAT_APPID、WECHAT_APPSECRET与 soul-api 一致即可)")
if args.cmd == "audit-status":
cmd_audit_status(appid, secret)
elif args.cmd == "get-category":
cmd_get_category(appid, secret)
elif args.cmd == "submit-audit":
cmd_submit_audit(
appid, secret, args.version_desc, args.item_json, args.privacy_api_not_use
)
if __name__ == "__main__":
main()