🔄 卡若AI 同步 2026-03-10 15:16 | 更新:水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 11 个

This commit is contained in:
2026-03-10 15:16:50 +08:00
parent 0cd6d779da
commit 7e318684a4
15 changed files with 420 additions and 71 deletions

View File

@@ -1,6 +1,6 @@
{
"updated": "2026-03-01 08:00:22",
"date": "2026-03-01",
"updated": "2026-03-10 15:13:06",
"date": "2026-03-10",
"scan_total": 0,
"copied_new": 0,
"skipped_idempotent": 0,

View File

@@ -71,8 +71,8 @@ def _load_credential():
)
async def _api_publish(video_path: str, title: str) -> PublishResult:
"""方案一bilibili-api-python 纯 API"""
async def _api_publish(video_path: str, title: str, scheduled_time=None) -> PublishResult:
"""方案一bilibili-api-python 纯 API(支持定时发布)"""
from bilibili_api import video_uploader
from video_utils import extract_cover
@@ -100,6 +100,11 @@ async def _api_publish(video_path: str, title: str) -> PublishResult:
"up_close_reply": False,
}
if scheduled_time:
dtime = int(scheduled_time.timestamp())
meta["dtime"] = dtime
print(f" [API] 定时发布: {scheduled_time.strftime('%Y-%m-%d %H:%M')}", flush=True)
page = video_uploader.VideoUploaderPage(
path=video_path,
title=title[:80],
@@ -264,11 +269,12 @@ async def _playwright_publish(video_path: str, title: str) -> PublishResult:
)
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False) -> PublishResult:
"""API 优先 → Playwright 兜底"""
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False, scheduled_time=None) -> PublishResult:
"""API 优先 → Playwright 兜底(支持定时发布)"""
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB)", flush=True)
time_hint = f" → 定时 {scheduled_time.strftime('%H:%M')}" if scheduled_time else ""
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB){time_hint}", flush=True)
print(f" 标题: {title[:60]}", flush=True)
if not skip_dedup and is_published("B站", video_path):
@@ -285,7 +291,7 @@ async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1,
# 方案一:纯 API
print(" [方案一] bilibili-api-python 纯 API...", flush=True)
try:
result = await _api_publish(video_path, title)
result = await _api_publish(video_path, title, scheduled_time)
print(f" {result.log_line()}", flush=True)
return result
except Exception as e:

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""
多平台一键分发 v2 — 全链路自动化
多平台一键分发 v3 — 全链路自动化 + 定时排期
- 定时排期30-120 分钟随机间隔,超 24h 自动压缩
- 并行分发5 平台同时上传asyncio.gather
- 去重机制:已成功发布的视频自动跳过
- 失败重试:--retry 自动重跑历史失败任务
@@ -9,13 +10,15 @@
- 结果持久化JSON Lines 日志 + 控制台汇总
用法:
python3 distribute_all.py # 并行分发到所有平台
python3 distribute_all.py # 定时排期并行分发
python3 distribute_all.py --now # 立即发布(不排期)
python3 distribute_all.py --platforms B站 快手 # 只发指定平台
python3 distribute_all.py --check # 检查 Cookie
python3 distribute_all.py --retry # 重试失败任务
python3 distribute_all.py --video /path/to.mp4 # 发单条视频
python3 distribute_all.py --no-dedup # 跳过去重检查
python3 distribute_all.py --serial # 串行模式(调试用)
python3 distribute_all.py --min-gap 30 --max-gap 120 # 自定义间隔
"""
import argparse
import asyncio
@@ -34,6 +37,7 @@ from cookie_manager import CookieManager, check_all_cookies
from publish_result import (PublishResult, print_summary, save_results,
load_published_set, load_failed_tasks)
from title_generator import generate_title
from schedule_generator import generate_schedule, format_schedule
PLATFORM_CONFIG = {
"抖音": {
@@ -139,8 +143,9 @@ def load_platform_module(name: str, config: dict):
async def distribute_to_platform(
platform: str, config: dict, videos: list[Path],
published_set: set, skip_dedup: bool = False,
schedule_times: list = None,
) -> list[PublishResult]:
"""分发到单个平台(含去重)"""
"""分发到单个平台(含去重 + 定时排期"""
print(f"\n{'#'*60}")
print(f" [{platform}] 开始分发")
print(f"{'#'*60}")
@@ -190,11 +195,19 @@ async def distribute_to_platform(
success=True, status="skipped", message="去重跳过(已发布)",
))
publish_schedule = None
if schedule_times and len(to_publish) > 0:
if len(schedule_times) >= len(to_publish):
publish_schedule = schedule_times[:len(to_publish)]
else:
publish_schedule = generate_schedule(len(to_publish))
total = len(to_publish)
for i, vp in enumerate(to_publish):
title = generate_title(vp.name, titles_dict)
stime = publish_schedule[i] if publish_schedule else None
try:
r = await module.publish_one(str(vp), title, i + 1, total)
r = await module.publish_one(str(vp), title, i + 1, total, scheduled_time=stime)
if isinstance(r, PublishResult):
results.append(r)
else:
@@ -215,12 +228,13 @@ async def distribute_to_platform(
async def run_parallel(targets: list[str], videos: list[Path],
published_set: set, skip_dedup: bool) -> list[PublishResult]:
"""多平台并行分发"""
published_set: set, skip_dedup: bool,
schedule_times: list = None) -> list[PublishResult]:
"""多平台并行分发(共享排期)"""
tasks = []
for platform in targets:
config = PLATFORM_CONFIG[platform]
task = distribute_to_platform(platform, config, videos, published_set, skip_dedup)
task = distribute_to_platform(platform, config, videos, published_set, skip_dedup, schedule_times)
tasks.append(task)
platform_results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -239,12 +253,13 @@ async def run_parallel(targets: list[str], videos: list[Path],
async def run_serial(targets: list[str], videos: list[Path],
published_set: set, skip_dedup: bool) -> list[PublishResult]:
published_set: set, skip_dedup: bool,
schedule_times: list = None) -> list[PublishResult]:
"""多平台串行分发(调试用)"""
all_results = []
for platform in targets:
config = PLATFORM_CONFIG[platform]
results = await distribute_to_platform(platform, config, videos, published_set, skip_dedup)
results = await distribute_to_platform(platform, config, videos, published_set, skip_dedup, schedule_times)
all_results.extend(results)
return all_results
@@ -299,7 +314,7 @@ async def retry_failed() -> list[PublishResult]:
async def main():
parser = argparse.ArgumentParser(description="多平台一键视频分发 v2")
parser = argparse.ArgumentParser(description="多平台一键视频分发 v3定时排期")
parser.add_argument("--platforms", nargs="+", help="指定平台")
parser.add_argument("--check", action="store_true", help="只检查 Cookie")
parser.add_argument("--retry", action="store_true", help="重试失败任务")
@@ -307,6 +322,10 @@ async def main():
parser.add_argument("--video-dir", help="自定义视频目录")
parser.add_argument("--no-dedup", action="store_true", help="跳过去重")
parser.add_argument("--serial", action="store_true", help="串行模式")
parser.add_argument("--now", action="store_true", help="立即发布(不排期)")
parser.add_argument("--min-gap", type=int, default=30, help="最小间隔(分钟)")
parser.add_argument("--max-gap", type=int, default=120, help="最大间隔(分钟)")
parser.add_argument("--max-hours", type=float, default=24.0, help="最大排期跨度(小时)")
args = parser.parse_args()
available, alerts = check_cookies_with_alert()
@@ -354,16 +373,31 @@ async def main():
if (p, v.name) not in published_set:
total_new += 1
# 生成排期
schedule_times = None
if not args.now and total_new > 1:
schedule_times = generate_schedule(
len(videos),
min_gap=args.min_gap,
max_gap=args.max_gap,
max_hours=args.max_hours,
)
print(f"\n{'='*60}")
print(f" 分发计划 ({mode})")
print(f"{'='*60}")
print(f" 视频数: {len(videos)}")
print(f" 目标平台: {', '.join(targets)}")
print(f" 新任务: {total_new}")
print(f" 发布方式: {'立即发布' if args.now or not schedule_times else '定时排期'}")
if not args.no_dedup:
skipped = len(videos) * len(targets) - total_new
if skipped > 0:
print(f" 去重跳过: {skipped}")
if schedule_times:
print(f"\n 排期表:")
print(format_schedule([v.name for v in videos], schedule_times))
print()
if total_new == 0:
@@ -372,9 +406,9 @@ async def main():
t0 = time.time()
if args.serial:
all_results = await run_serial(targets, videos, published_set, args.no_dedup)
all_results = await run_serial(targets, videos, published_set, args.no_dedup, schedule_times)
else:
all_results = await run_parallel(targets, videos, published_set, args.no_dedup)
all_results = await run_parallel(targets, videos, published_set, args.no_dedup, schedule_times)
actual_results = [r for r in all_results if r.status != "skipped"]
print_summary(actual_results)

View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
定时排期生成器 — 为 N 条视频生成发布时间表
规则:
1. 相邻视频间隔 30-120 分钟(随机)
2. 若总时长 > max_hours按比例压缩至 max_hours 内
3. 第一条视频在 first_delay 分钟后发布
"""
import random
from datetime import datetime, timedelta
def generate_schedule(
n: int,
min_gap: int = 30,
max_gap: int = 120,
max_hours: float = 24.0,
first_delay: int = 5,
start_time: datetime = None,
) -> list[datetime]:
"""
返回 n 个 datetime每个对应一条视频的定时发布时间。
"""
if n <= 0:
return []
base = start_time or datetime.now()
if n == 1:
return [base + timedelta(minutes=first_delay)]
gaps = [random.randint(min_gap, max_gap) for _ in range(n - 1)]
total_min = first_delay + sum(gaps)
max_min = max_hours * 60
if total_min > max_min:
ratio = max_min / total_min
first_delay = int(first_delay * ratio)
gaps = [max(1, int(g * ratio)) for g in gaps]
times = []
cur = base + timedelta(minutes=first_delay)
times.append(cur)
for g in gaps:
cur = cur + timedelta(minutes=g)
times.append(cur)
return times
def format_schedule(videos: list[str], times: list[datetime]) -> str:
"""格式化排期表用于打印"""
lines = [" 序号 | 发布时间 | 间隔 | 视频"]
lines.append(" " + "-" * 70)
for i, (v, t) in enumerate(zip(videos, times)):
gap = ""
if i > 0:
delta = (t - times[i - 1]).total_seconds() / 60
gap = f"{delta:.0f}min"
name = v[:40] if len(v) > 40 else v
lines.append(f" {i+1:>4} | {t.strftime('%Y-%m-%d %H:%M')} | {gap:>7} | {name}")
total = (times[-1] - times[0]).total_seconds() / 3600 if len(times) > 1 else 0
lines.append(" " + "-" * 70)
lines.append(f" 总跨度: {total:.1f}h | 首条: {times[0].strftime('%H:%M')} | 末条: {times[-1].strftime('%H:%M')}")
return "\n".join(lines)
if __name__ == "__main__":
schedule = generate_schedule(15)
names = [f"视频_{i+1}.mp4" for i in range(15)]
print(format_schedule(names, schedule))

View File

@@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""
Playwright 平台定时发布辅助 — 通用日期时间填写逻辑
各平台调用 set_scheduled_time(page, dt, platform_hint) 即可
"""
import asyncio
from datetime import datetime
async def set_scheduled_time(page, scheduled_time: datetime, platform: str = "") -> bool:
"""
在 Playwright 页面上设置定时发布时间。
返回 True 表示成功设置False 表示失败(降级为立即发布)。
"""
if not scheduled_time:
return False
date_str = scheduled_time.strftime("%Y-%m-%d")
time_str = scheduled_time.strftime("%H:%M")
print(f" [定时] 设置定时发布: {date_str} {time_str}", flush=True)
try:
found = await _click_schedule_toggle(page)
if not found:
print(f" [定时] 未找到定时发布选项,降级为立即发布", flush=True)
return False
await asyncio.sleep(1)
ok = await _fill_datetime(page, scheduled_time, date_str, time_str)
if ok:
print(f" [定时] 已设置定时: {date_str} {time_str}", flush=True)
else:
print(f" [定时] 日期时间填写失败,降级为立即发布", flush=True)
return ok
except Exception as e:
print(f" [定时] 异常: {str(e)[:60]},降级为立即发布", flush=True)
return False
async def _click_schedule_toggle(page) -> bool:
"""找到并点击定时发布开关/单选按钮"""
selectors = [
'label:has-text("定时发布")',
'span:has-text("定时发布")',
'div:has-text("定时发布"):not(:has(div:has-text("定时发布")))',
'input[value="schedule"] + label',
'input[value="schedule"]',
'text=定时发布',
'[class*="schedule"]',
'[class*="timing"]',
]
for sel in selectors:
loc = page.locator(sel).first
try:
if await loc.count() > 0:
await loc.scroll_into_view_if_needed()
await asyncio.sleep(0.3)
await loc.click(force=True)
return True
except Exception:
continue
clicked = await page.evaluate("""() => {
const texts = ['定时发布', '定时', '预约发布', '选择时间发布'];
const all = document.querySelectorAll('label, span, div, li, a, button, input[type="radio"]');
for (const el of all) {
const t = el.textContent?.trim();
if (t && texts.some(k => t === k || t.startsWith(k)) && el.offsetParent !== null) {
el.click();
return true;
}
}
return false;
}""")
return clicked
async def _fill_datetime(page, dt: datetime, date_str: str, time_str: str) -> bool:
"""填写日期和时间(处理各种 datepicker 形式)"""
date_filled = await _try_fill_date(page, dt, date_str)
time_filled = await _try_fill_time(page, dt, time_str)
if not date_filled and not time_filled:
return await _try_fill_combined(page, dt, date_str, time_str)
return date_filled or time_filled
async def _try_fill_date(page, dt: datetime, date_str: str) -> bool:
"""尝试填写日期"""
date_selectors = [
'input[type="date"]',
'input[placeholder*="日期"]',
'input[placeholder*=""]',
'input[class*="date"]',
]
for sel in date_selectors:
loc = page.locator(sel).first
try:
if await loc.count() > 0:
await loc.click(force=True)
await loc.fill(date_str)
await asyncio.sleep(0.3)
return True
except Exception:
continue
filled = await page.evaluate("""(dateStr) => {
const inputs = document.querySelectorAll('input');
for (const inp of inputs) {
const ph = (inp.placeholder || '').toLowerCase();
const cls = (inp.className || '').toLowerCase();
if ((ph.includes('日期') || ph.includes('date') || cls.includes('date'))
&& inp.offsetParent !== null) {
const nativeSet = Object.getOwnPropertyDescriptor(HTMLInputElement.prototype, 'value').set;
nativeSet.call(inp, dateStr);
inp.dispatchEvent(new Event('input', {bubbles: true}));
inp.dispatchEvent(new Event('change', {bubbles: true}));
return true;
}
}
return false;
}""", date_str)
return filled
async def _try_fill_time(page, dt: datetime, time_str: str) -> bool:
"""尝试填写时间"""
time_selectors = [
'input[type="time"]',
'input[placeholder*="时间"]',
'input[placeholder*=""]',
'input[class*="time"]:not([class*="timestamp"])',
]
for sel in time_selectors:
loc = page.locator(sel).first
try:
if await loc.count() > 0:
await loc.click(force=True)
await loc.fill(time_str)
await asyncio.sleep(0.3)
return True
except Exception:
continue
filled = await page.evaluate("""(timeStr) => {
const inputs = document.querySelectorAll('input');
for (const inp of inputs) {
const ph = (inp.placeholder || '').toLowerCase();
const cls = (inp.className || '').toLowerCase();
if ((ph.includes('时间') || ph.includes('time') || cls.includes('time'))
&& !cls.includes('timestamp') && inp.offsetParent !== null) {
const nativeSet = Object.getOwnPropertyDescriptor(HTMLInputElement.prototype, 'value').set;
nativeSet.call(inp, timeStr);
inp.dispatchEvent(new Event('input', {bubbles: true}));
inp.dispatchEvent(new Event('change', {bubbles: true}));
return true;
}
}
return false;
}""", time_str)
return filled
async def _try_fill_combined(page, dt: datetime, date_str: str, time_str: str) -> bool:
"""尝试一体式 datetime 输入"""
combined = f"{date_str} {time_str}"
loc = page.locator('input[type="datetime-local"]').first
try:
if await loc.count() > 0:
await loc.fill(f"{date_str}T{time_str}")
return True
except Exception:
pass
filled = await page.evaluate("""(combined) => {
const inputs = document.querySelectorAll('input');
for (const inp of inputs) {
const ph = (inp.placeholder || '');
if ((ph.includes('发布时间') || ph.includes('选择时间'))
&& inp.offsetParent !== null) {
const nativeSet = Object.getOwnPropertyDescriptor(HTMLInputElement.prototype, 'value').set;
nativeSet.call(inp, combined);
inp.dispatchEvent(new Event('input', {bubbles: true}));
inp.dispatchEvent(new Event('change', {bubbles: true}));
return true;
}
}
return false;
}""", combined)
return filled

View File

@@ -54,14 +54,15 @@ TITLES = {
}
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False) -> PublishResult:
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False, scheduled_time=None) -> PublishResult:
from playwright.async_api import async_playwright
from publish_result import is_published
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
t0 = time.time()
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB)", flush=True)
time_hint = f" → 定时 {scheduled_time.strftime('%H:%M')}" if scheduled_time else ""
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB){time_hint}", flush=True)
print(f" 标题: {title[:60]}", flush=True)
if not skip_dedup and is_published("小红书", video_path):
@@ -151,6 +152,13 @@ async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1,
await asyncio.sleep(1)
# 定时发布
if scheduled_time:
from schedule_helper import set_scheduled_time
scheduled_ok = await set_scheduled_time(page, scheduled_time, "小红书")
if scheduled_ok:
print(f" [定时] 小红书定时发布已设置", flush=True)
await asyncio.sleep(1)
print(" [4] 等待发布按钮启用...", flush=True)
pub = page.locator('button:has-text("发布")').first

View File

@@ -54,14 +54,15 @@ TITLES = {
}
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False) -> PublishResult:
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False, scheduled_time=None) -> PublishResult:
from playwright.async_api import async_playwright
from publish_result import is_published
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
t0 = time.time()
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB)", flush=True)
time_hint = f" → 定时 {scheduled_time.strftime('%H:%M')}" if scheduled_time else ""
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB){time_hint}", flush=True)
print(f" 标题: {title[:60]}", flush=True)
if not skip_dedup and is_published("快手", video_path):
@@ -157,6 +158,13 @@ async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1,
# 清除可能的 tooltip
await page.evaluate("""document.querySelectorAll('[data-tippy-root],[class*="tooltip"],[class*="popover"]').forEach(e => e.remove())""")
# 定时发布
if scheduled_time:
from schedule_helper import set_scheduled_time
scheduled_ok = await set_scheduled_time(page, scheduled_time, "快手")
if scheduled_ok:
print(f" [定时] 快手定时发布已设置", flush=True)
print(" [4] 发布...", flush=True)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)

File diff suppressed because one or more lines are too long

View File

@@ -541,14 +541,20 @@ async def create_v2(
async def publish_one(
video_path: str,
title: str,
timing_ts: int = 0,
idx: int = 1,
total: int = 1,
) -> bool:
skip_dedup: bool = False,
scheduled_time=None,
) -> "PublishResult":
global USER_ID
sys.path.insert(0, str(SCRIPT_DIR.parent.parent / "多平台分发" / "脚本"))
from publish_result import PublishResult, is_published
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
timing_str = datetime.datetime.fromtimestamp(timing_ts).strftime("%m-%d %H:%M") if timing_ts > 0 else "立即"
timing_ts = int(scheduled_time.timestamp()) if scheduled_time else 0
timing_str = scheduled_time.strftime("%m-%d %H:%M") if scheduled_time else "立即"
t0 = time.time()
print(f"\n{'='*60}")
print(f" [{idx}/{total}] {fname}")
@@ -556,6 +562,11 @@ async def publish_one(
print(f" 标题: {title[:60]}")
print(f"{'='*60}")
if not skip_dedup and is_published("抖音", video_path):
print(f" [跳过] 该视频已发布到抖音", flush=True)
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=True, status="skipped", message="去重跳过(已发布)")
try:
keys = SecurityKeys(COOKIE_FILE)
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
@@ -564,32 +575,45 @@ async def publish_one(
)
uid_data = resp.json()
if uid_data.get("status_code") != 0:
print(f" [✗] Cookie 已过期,请重新运行 douyin_login.py")
return False
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=False, status="error", message="Cookie 已过期",
elapsed_sec=time.time()-t0)
user = uid_data.get("user") or uid_data.get("user_info") or {}
USER_ID = str(user.get("uid", "") or user.get("user_id", ""))
auth = await get_upload_auth(client, keys)
info = await apply_upload(client, auth, fsize)
if not await upload_chunks(client, info, video_path):
print(" [✗] 上传失败")
return False
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=False, status="failed", message="上传失败",
elapsed_sec=time.time()-t0)
video_id = await commit_upload(client, auth, info["session_key"])
if not video_id:
print(" [✗] 未获取到 video_id")
return False
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=False, status="failed", message="未获取到 video_id",
elapsed_sec=time.time()-t0)
result = await create_v2(client, keys, video_id, title, timing_ts)
elapsed = time.time() - t0
if result.get("status_code") == 0:
item_id = result.get("item_id", "")
print(f" [✓] 发布成功 item_id={item_id}")
return True
msg = f"发布成功 item_id={item_id}"
if timing_ts > 0:
msg += f" (定时 {timing_str})"
print(f" [✓] {msg}")
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=True, status="reviewing", message=msg,
elapsed_sec=elapsed)
else:
print(f" [✗] 发布失败: {result}")
return False
msg = f"发布失败: {str(result)[:80]}"
print(f" [✗] {msg}")
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=False, status="failed", message=msg,
elapsed_sec=elapsed)
except Exception as e:
print(f" [✗] 异常: {e}")
return False
return PublishResult(platform="抖音", video_path=video_path, title=title,
success=False, status="error", message=f"异常: {str(e)[:80]}",
elapsed_sec=time.time()-t0)
# ═══════════════════════════════════════════════════════════
@@ -626,36 +650,27 @@ async def main():
return 1
print(f"[i] 共 {len(videos)} 条视频")
now_ts = int(time.time())
base_ts = ((now_ts + 3600) // 3600 + 1) * 3600
sys.path.insert(0, str(SCRIPT_DIR.parent.parent / "多平台分发" / "脚本"))
from publish_result import print_summary, save_results
from schedule_generator import generate_schedule, format_schedule
schedule = []
for i, vp in enumerate(videos):
ts = base_ts + i * 3600
title = TITLES.get(vp.name, f"{vp.stem} #Soul派对 #创业日记")
schedule.append((vp, title, ts))
dt_str = datetime.datetime.fromtimestamp(ts).strftime("%m-%d %H:%M")
print(f" {i+1:2d}. {dt_str} | {vp.name[:50]}")
schedule_times = generate_schedule(len(videos))
print(f"\n排期:")
print(format_schedule([v.name for v in videos], schedule_times))
results = []
for i, (vp, title, ts) in enumerate(schedule):
ok = await publish_one(str(vp), title, ts, i + 1, len(schedule))
results.append((vp.name, ok, ts))
if i < len(schedule) - 1:
wait = 3 if ok else 1
print(f" 等待 {wait}s...")
await asyncio.sleep(wait)
for i, (vp, stime) in enumerate(zip(videos, schedule_times)):
title = TITLES.get(vp.name, f"{vp.stem} #Soul派对 #创业日记")
r = await publish_one(str(vp), title, i + 1, len(videos), scheduled_time=stime)
results.append(r)
if i < len(videos) - 1:
await asyncio.sleep(3)
print(f"\n{'='*60}")
print(" 发布汇总")
print(f"{'='*60}")
for name, ok, ts in results:
s = "" if ok else ""
t = datetime.datetime.fromtimestamp(ts).strftime("%m-%d %H:%M")
print(f" [{s}] {t} | {name}")
success = sum(1 for _, ok, _ in results if ok)
print(f"\n 成功: {success}/{len(results)}")
return 0 if success == len(results) else 1
actual = [r for r in results if r.status != "skipped"]
print_summary(actual)
save_results(actual)
ok = sum(1 for r in actual if r.success)
return 0 if ok == len(actual) else 1
if __name__ == "__main__":

View File

@@ -54,14 +54,15 @@ TITLES = {
}
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False) -> PublishResult:
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1, skip_dedup: bool = False, scheduled_time=None) -> PublishResult:
from playwright.async_api import async_playwright
from publish_result import is_published
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
t0 = time.time()
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB)", flush=True)
time_hint = f" → 定时 {scheduled_time.strftime('%H:%M')}" if scheduled_time else ""
print(f"\n[{idx}/{total}] {fname} ({fsize/1024/1024:.1f}MB){time_hint}", flush=True)
print(f" 标题: {title[:60]}", flush=True)
if not skip_dedup and is_published("视频号", video_path):
@@ -145,6 +146,13 @@ async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1,
}""", title)
await asyncio.sleep(0.5)
# 定时发布
if scheduled_time:
from schedule_helper import set_scheduled_time
scheduled_ok = await set_scheduled_time(page, scheduled_time, "视频号")
if scheduled_ok:
print(f" [定时] 视频号定时发布已设置", flush=True)
# 滚动到底部找发表按钮
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)

View File

@@ -262,3 +262,4 @@
| 2026-03-10 13:34:41 | 🔄 卡若AI 同步 2026-03-10 13:34 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 13:48:50 | 🔄 卡若AI 同步 2026-03-10 13:48 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 14:18:01 | 🔄 卡若AI 同步 2026-03-10 14:17 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 15:02:53 | 🔄 卡若AI 同步 2026-03-10 15:02 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 |

View File

@@ -265,3 +265,4 @@
| 2026-03-10 13:34:41 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 13:34 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 13:48:50 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 13:48 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 14:18:01 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 14:17 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 15:02:53 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 15:02 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |