🔄 卡若AI 同步 2026-03-10 14:17 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个

This commit is contained in:
2026-03-10 14:17:56 +08:00
parent a8822b3a21
commit 12a32ce47e
9 changed files with 337 additions and 43 deletions

View File

@@ -1,13 +1,21 @@
#!/usr/bin/env python3
"""
多平台一键分发 - 将成片目录下的视频同时发布到 5 个平台
支持: 抖音、B站、视频号、小红书、快手
多平台一键分发 v2 — 全链路自动化
- 并行分发5 平台同时上传asyncio.gather
- 去重机制:已成功发布的视频自动跳过
- 失败重试:--retry 自动重跑历史失败任务
- Cookie 预警:过期/即将过期自动通知
- 智能标题:优先手动字典,否则文件名自动生成
- 结果持久化JSON Lines 日志 + 控制台汇总
用法:
python3 distribute_all.py # 分发到所有已登录平台
python3 distribute_all.py --platforms 抖音 B站 # 只分发到指定平台
python3 distribute_all.py --check # 检查 Cookie 状态
python3 distribute_all.py --video /path/to.mp4 # 分发单条视频
python3 distribute_all.py # 并行分发到所有平台
python3 distribute_all.py --platforms B站 快手 # 只指定平台
python3 distribute_all.py --check # 检查 Cookie
python3 distribute_all.py --retry # 重试失败任务
python3 distribute_all.py --video /path/to.mp4 # 发单条视频
python3 distribute_all.py --no-dedup # 跳过去重检查
python3 distribute_all.py --serial # 串行模式(调试用)
"""
import argparse
import asyncio
@@ -19,11 +27,13 @@ from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
BASE_DIR = SCRIPT_DIR.parent.parent
VIDEO_DIR = Path("/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片")
DEFAULT_VIDEO_DIR = Path("/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片")
sys.path.insert(0, str(SCRIPT_DIR))
from cookie_manager import CookieManager, check_all_cookies
from publish_result import PublishResult, print_summary, save_results
from publish_result import (PublishResult, print_summary, save_results,
load_published_set, load_failed_tasks)
from title_generator import generate_title
PLATFORM_CONFIG = {
"抖音": {
@@ -58,13 +68,17 @@ PLATFORM_CONFIG = {
},
}
_module_cache = {}
def check_cookies():
def check_cookies_with_alert() -> tuple[list[str], list[str]]:
"""检查 Cookie 并返回 (可用平台, 告警消息)"""
print("=" * 60)
print(" 多平台 Cookie 状态")
print("=" * 60)
results = check_all_cookies(BASE_DIR)
available = []
alerts = []
for platform, info in results.items():
icons = {"ok": "", "warning": "", "expiring_soon": "",
"expired": "", "missing": "", "error": ""}
@@ -72,11 +86,45 @@ def check_cookies():
print(f" [{icon}] {platform}: {info['message']}")
if info["status"] in ("ok", "warning"):
available.append(platform)
if info["status"] == "expiring_soon":
alerts.append(f"{platform} Cookie 即将过期: {info['message']}")
elif info["status"] == "expired":
alerts.append(f"{platform} Cookie 已过期,需重新登录")
elif info["status"] == "warning":
hrs = info.get("remaining_hours", -1)
if 0 < hrs < 12:
alerts.append(f"{platform} Cookie 剩余 {hrs}h建议刷新")
print(f"\n 可用平台: {', '.join(available) if available else ''}")
return available
if alerts:
print(f"\n ⚠ Cookie 预警:")
for a in alerts:
print(f" {a}")
return available, alerts
def send_feishu_alert(alerts: list[str]):
"""通过飞书 Webhook 发送 Cookie 过期预警"""
import os
webhook = os.environ.get("FEISHU_WEBHOOK_URL", "")
if not webhook or not alerts:
return
try:
import requests
body = {
"msg_type": "text",
"content": {
"text": "【多平台分发 Cookie 预警】\n" + "\n".join(alerts)
}
}
requests.post(webhook, json=body, timeout=10)
print(" [i] 飞书预警已发送")
except Exception as e:
print(f" [⚠] 飞书通知失败: {e}")
def load_platform_module(name: str, config: dict):
if name in _module_cache:
return _module_cache[name]
script_path = config["script"]
if not script_path.exists():
return None
@@ -84,26 +132,33 @@ def load_platform_module(name: str, config: dict):
module = importlib.util.module_from_spec(spec)
sys.path.insert(0, str(script_path.parent))
spec.loader.exec_module(module)
_module_cache[name] = module
return module
async def distribute_to_platform(platform: str, config: dict, videos: list) -> list[PublishResult]:
async def distribute_to_platform(
platform: str, config: dict, videos: list[Path],
published_set: set, skip_dedup: bool = False,
) -> list[PublishResult]:
"""分发到单个平台(含去重)"""
print(f"\n{'#'*60}")
print(f" 开始分发到 [{platform}]")
print(f" [{platform}] 开始分发")
print(f"{'#'*60}")
cookie_path = config["cookie"]
if not cookie_path.exists():
print(f" [✗] {platform} 未登录,跳过")
print(f" [{platform}] ✗ 未登录,跳过")
return [PublishResult(platform=platform, video_path=str(v), title="",
success=False, status="error", message="未登录") for v in videos]
success=False, status="error",
message="未登录", error_code="NOT_LOGGED_IN") for v in videos]
try:
cm = CookieManager(cookie_path, config["domain"])
if not cm.is_valid():
print(f" [✗] {platform} Cookie 已过期,跳过")
print(f" [{platform}] ✗ Cookie 已过期,跳过")
return [PublishResult(platform=platform, video_path=str(v), title="",
success=False, status="error", message="Cookie过期") for v in videos]
success=False, status="error",
message="Cookie过期", error_code="COOKIE_EXPIRED") for v in videos]
except Exception as e:
return [PublishResult(platform=platform, video_path=str(v), title="",
success=False, status="error", message=str(e)) for v in videos]
@@ -113,10 +168,31 @@ async def distribute_to_platform(platform: str, config: dict, videos: list) -> l
return [PublishResult(platform=platform, video_path=str(v), title="",
success=False, status="error", message="脚本不存在") for v in videos]
titles_dict = getattr(module, "TITLES", {})
to_publish = []
skipped = []
for vp in videos:
key = (platform, vp.name)
if not skip_dedup and key in published_set:
skipped.append(vp)
else:
to_publish.append(vp)
if skipped:
print(f" [{platform}] 跳过 {len(skipped)} 条已发布视频(去重)")
results = []
total = len(videos)
for i, vp in enumerate(videos):
title = getattr(module, "TITLES", {}).get(vp.name, f"{vp.stem} #Soul派对")
for s in skipped:
results.append(PublishResult(
platform=platform, video_path=str(s),
title=generate_title(s.name, titles_dict),
success=True, status="skipped", message="去重跳过(已发布)",
))
total = len(to_publish)
for i, vp in enumerate(to_publish):
title = generate_title(vp.name, titles_dict)
try:
r = await module.publish_one(str(vp), title, i + 1, total)
if isinstance(r, PublishResult):
@@ -138,62 +214,181 @@ async def distribute_to_platform(platform: str, config: dict, videos: list) -> l
return results
async def run_parallel(targets: list[str], videos: list[Path],
published_set: set, skip_dedup: bool) -> list[PublishResult]:
"""多平台并行分发"""
tasks = []
for platform in targets:
config = PLATFORM_CONFIG[platform]
task = distribute_to_platform(platform, config, videos, published_set, skip_dedup)
tasks.append(task)
platform_results = await asyncio.gather(*tasks, return_exceptions=True)
all_results = []
for i, res in enumerate(platform_results):
if isinstance(res, Exception):
for v in videos:
all_results.append(PublishResult(
platform=targets[i], video_path=str(v), title="",
success=False, status="error", message=str(res)[:80],
))
else:
all_results.extend(res)
return all_results
async def run_serial(targets: list[str], videos: list[Path],
published_set: set, skip_dedup: bool) -> list[PublishResult]:
"""多平台串行分发(调试用)"""
all_results = []
for platform in targets:
config = PLATFORM_CONFIG[platform]
results = await distribute_to_platform(platform, config, videos, published_set, skip_dedup)
all_results.extend(results)
return all_results
async def retry_failed() -> list[PublishResult]:
"""重试历史失败任务"""
failed = load_failed_tasks()
if not failed:
print("[i] 无失败任务需要重试")
return []
print(f"\n{'='*60}")
print(f" 失败任务重试")
print(f"{'='*60}")
print(f" 待重试: {len(failed)}")
results = []
for task in failed:
platform = task.get("platform", "")
video_path = task.get("video_path", "")
title = task.get("title", "")
if platform not in PLATFORM_CONFIG:
continue
if not Path(video_path).exists():
print(f" [✗] 视频不存在: {video_path}")
continue
config = PLATFORM_CONFIG[platform]
module = load_platform_module(platform, config)
if not module:
continue
print(f"\n [{platform}] 重试: {Path(video_path).name}")
try:
r = await module.publish_one(video_path, title, 1, 1)
if isinstance(r, PublishResult):
results.append(r)
else:
results.append(PublishResult(
platform=platform, video_path=video_path, title=title,
success=bool(r), status="reviewing" if r else "failed",
))
except Exception as e:
results.append(PublishResult(
platform=platform, video_path=video_path, title=title,
success=False, status="error", message=str(e)[:80],
))
await asyncio.sleep(3)
return results
async def main():
parser = argparse.ArgumentParser(description="多平台一键视频分发")
parser.add_argument("--platforms", nargs="+", help="指定平台(默认全部已登录平台)")
parser.add_argument("--check", action="store_true", help="只检查 Cookie 状态")
parser = argparse.ArgumentParser(description="多平台一键视频分发 v2")
parser.add_argument("--platforms", nargs="+", help="指定平台")
parser.add_argument("--check", action="store_true", help="只检查 Cookie")
parser.add_argument("--retry", action="store_true", help="重试失败任务")
parser.add_argument("--video", help="分发单条视频")
parser.add_argument("--video-dir", help="自定义视频目录")
parser.add_argument("--no-dedup", action="store_true", help="跳过去重")
parser.add_argument("--serial", action="store_true", help="串行模式")
args = parser.parse_args()
available = check_cookies()
available, alerts = check_cookies_with_alert()
if alerts:
send_feishu_alert(alerts)
if args.check:
return 0
if args.retry:
results = await retry_failed()
if results:
print_summary(results)
save_results(results)
return 0
if not available:
print("\n[✗] 没有可用平台,请先登录各平台")
print("\n[✗] 没有可用平台,请先登录:")
for p, c in PLATFORM_CONFIG.items():
print(f" {p}: python3 {c['script']}")
login = str(c["script"]).replace("publish", "login").replace("pure_api", "login")
print(f" {p}: python3 {login}")
return 1
targets = args.platforms if args.platforms else available
targets = [t for t in targets if t in available]
if not targets:
print("\n[✗] 指定的平台均不可用")
return 1
video_dir = Path(args.video_dir) if args.video_dir else VIDEO_DIR
video_dir = Path(args.video_dir) if args.video_dir else DEFAULT_VIDEO_DIR
if args.video:
videos = [Path(args.video)]
else:
videos = sorted(video_dir.glob("*.mp4"))
if not videos:
print(f"\n[✗] 未找到视频: {video_dir}")
return 1
published_set = set() if args.no_dedup else load_published_set()
mode = "串行" if args.serial else "并行"
total_new = 0
for p in targets:
for v in videos:
if (p, v.name) not in published_set:
total_new += 1
print(f"\n{'='*60}")
print(f" 分发计划")
print(f" 分发计划 ({mode})")
print(f"{'='*60}")
print(f" 视频数: {len(videos)}")
print(f" 目标平台: {', '.join(targets)}")
print(f" 任务: {len(videos) * len(targets)}")
print(f" 任务: {total_new}")
if not args.no_dedup:
skipped = len(videos) * len(targets) - total_new
if skipped > 0:
print(f" 去重跳过: {skipped}")
print()
all_results: list[PublishResult] = []
for platform in targets:
config = PLATFORM_CONFIG[platform]
platform_results = await distribute_to_platform(platform, config, videos)
all_results.extend(platform_results)
if total_new == 0:
print("[i] 所有视频已发布到所有平台,无新任务")
return 0
print_summary(all_results)
save_results(all_results)
t0 = time.time()
if args.serial:
all_results = await run_serial(targets, videos, published_set, args.no_dedup)
else:
all_results = await run_parallel(targets, videos, published_set, args.no_dedup)
actual_results = [r for r in all_results if r.status != "skipped"]
print_summary(actual_results)
save_results(actual_results)
ok = sum(1 for r in actual_results if r.success)
total = len(actual_results)
elapsed = time.time() - t0
print(f" 总耗时: {elapsed:.1f}s | 日志: {SCRIPT_DIR / 'publish_log.json'}")
failed_count = total - ok
if failed_count > 0:
print(f"\n{failed_count} 条失败,可执行: python3 distribute_all.py --retry")
ok = sum(1 for r in all_results if r.success)
total = len(all_results)
print(f" 日志已保存: {SCRIPT_DIR / 'publish_log.json'}")
return 0 if ok == total else 1

View File

@@ -0,0 +1,7 @@
{"platform": "B站", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/Soul业务模型 派对+切片+小程序全链路.mp4", "title": "派对获客→AI切片→小程序变现全链路拆解 #商业模式 #一人公司", "success": true, "status": "reviewing", "message": "纯API投稿成功 (7.2s)", "elapsed_sec": 7.174537897109985, "timestamp": "2026-03-10 14:16:20"}
{"platform": "快手", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/Soul业务模型 派对+切片+小程序全链路.mp4", "title": "派对获客→AI切片→小程序变现全链路拆解 #商业模式 #一人公司", "success": true, "status": "published", "message": "发布成功", "screenshot": "/tmp/kuaishou_result.png", "elapsed_sec": 22.87360692024231, "timestamp": "2026-03-10 14:16:36"}
{"platform": "视频号", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/Soul业务模型 派对+切片+小程序全链路.mp4", "title": "派对获客→AI切片→小程序变现全链路拆解 #商业模式 #一人公司", "success": true, "status": "reviewing", "message": "已跳转到内容管理(发表成功)", "screenshot": "/tmp/channels_result.png", "elapsed_sec": 24.60638403892517, "timestamp": "2026-03-10 14:16:37"}
{"platform": "B站", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/早起不是为了开派对,是不吵老婆睡觉.mp4", "title": "每天6点起床不是因为自律是因为老婆还在睡 #Soul派对 #创业日记", "success": true, "status": "reviewing", "message": "纯API投稿成功 (7.0s)", "elapsed_sec": 6.964767932891846, "timestamp": "2026-03-10 14:17:08"}
{"platform": "小红书", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/早起不是为了开派对,是不吵老婆睡觉.mp4", "title": "每天6点起床不是因为自律是因为老婆还在睡 #Soul派对 #创业日记", "success": true, "status": "reviewing", "message": "已提交,请确认截图", "screenshot": "/tmp/xhs_result.png", "elapsed_sec": 33.28828287124634, "timestamp": "2026-03-10 14:17:35"}
{"platform": "快手", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/早起不是为了开派对,是不吵老婆睡觉.mp4", "title": "每天6点起床不是因为自律是因为老婆还在睡 #Soul派对 #创业日记", "success": true, "status": "published", "message": "发布成功", "screenshot": "/tmp/kuaishou_result.png", "elapsed_sec": 41.6892192363739, "timestamp": "2026-03-10 14:17:43"}
{"platform": "视频号", "video_path": "/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片/早起不是为了开派对,是不吵老婆睡觉.mp4", "title": "每天6点起床不是因为自律是因为老婆还在睡 #Soul派对 #创业日记", "success": true, "status": "reviewing", "message": "已跳转到内容管理(发表成功)", "screenshot": "/tmp/channels_result.png", "elapsed_sec": 25.486361026763916, "timestamp": "2026-03-10 14:17:27"}

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""
统一发布结果模块 — 所有平台的 publish_one 都返回此结构。
含:结果日志、去重检查、失败重试加载、飞书通知。
"""
import json
import time
@@ -41,6 +42,49 @@ def save_results(results: list[PublishResult]):
f.write(json.dumps(r.to_dict(), ensure_ascii=False) + "\n")
def load_published_set() -> set[tuple[str, str]]:
"""加载已成功发布的 (platform, video_filename) 集合,用于去重"""
published = set()
if not RESULT_LOG.exists():
return published
with open(RESULT_LOG, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
if rec.get("success"):
fname = Path(rec.get("video_path", "")).name
published.add((rec["platform"], fname))
except json.JSONDecodeError:
continue
return published
def load_failed_tasks() -> list[dict]:
"""加载失败任务列表(用于重试)"""
failed = []
if not RESULT_LOG.exists():
return failed
seen = {}
with open(RESULT_LOG, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
key = (rec.get("platform", ""), Path(rec.get("video_path", "")).name)
seen[key] = rec
except json.JSONDecodeError:
continue
for key, rec in seen.items():
if not rec.get("success"):
failed.append(rec)
return failed
def print_summary(results: list[PublishResult]):
"""控制台打印汇总表"""
if not results:

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""
智能标题生成 — 从视频文件名生成各平台标题
规则:
1. 优先使用各平台脚本的 TITLES 字典(手动优化过的标题)
2. 未在字典中的视频,基于文件名自动生成
3. 自动添加话题标签
"""
import re
from pathlib import Path
DEFAULT_TAGS = "#Soul派对 #创业日记"
def clean_filename(name: str) -> str:
"""将视频文件名转为可读标题"""
stem = Path(name).stem
stem = re.sub(r'^\d+[._\-\s]*', '', stem)
stem = stem.replace('_', ' ').replace(' ', ' ').strip()
return stem
def generate_title(filename: str, titles_dict: dict = None, max_len: int = 60) -> str:
"""生成发布标题:优先字典 → 否则从文件名生成"""
if titles_dict and filename in titles_dict:
return titles_dict[filename]
base = clean_filename(filename)
if not base:
base = Path(filename).stem
if "#" not in base:
remaining = max_len - len(DEFAULT_TAGS) - 1
if len(base) > remaining:
base = base[:remaining]
base = f"{base} {DEFAULT_TAGS}"
return base[:max_len]
def generate_title_xhs(filename: str, titles_dict: dict = None) -> tuple[str, str]:
"""小红书需要分标题(≤20字)和正文描述"""
full = generate_title(filename, titles_dict, max_len=80)
parts = re.split(r'[,!?\s]+', full)
title_part = parts[0][:20] if parts else full[:20]
return title_part, full

File diff suppressed because one or more lines are too long

View File

@@ -260,3 +260,4 @@
| 2026-03-10 12:30:08 | 🔄 卡若AI 同步 2026-03-10 12:30 | 更新:水桥平台对接、卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 12:54:57 | 🔄 卡若AI 同步 2026-03-10 12:54 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 13:34:41 | 🔄 卡若AI 同步 2026-03-10 13:34 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 13:48:50 | 🔄 卡若AI 同步 2026-03-10 13:48 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 |

View File

@@ -263,3 +263,4 @@
| 2026-03-10 12:30:08 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 12:30 | 更新:水桥平台对接、卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 12:54:57 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 12:54 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 13:34:41 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 13:34 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 13:48:50 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 13:48 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |