🔄 卡若AI 同步 2026-03-10 12:54 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个

This commit is contained in:
2026-03-10 12:54:51 +08:00
parent 48ea403a37
commit c4799d5c45
10 changed files with 318 additions and 331 deletions

View File

@@ -11,7 +11,7 @@ updated: "2026-03-10"
# Soul 发到素材库 · 基因胶囊
> **一句话**Soul 派对成片目录(含 目录索引.md 与 mp4→ 飞书知识库多维表格(内容看板),含**附件上传**、**多平台描述**(抖音/小红书/视频号),支持新建与更新已有记录
> **一句话**Soul 派对成片目录(含 目录索引.md 与 mp4→ 飞书知识库多维表格(内容看板),标题为「119场 3月8日 标题」不含第N场附件按目录索引与视频一一对应多平台描述同步写入
---
@@ -19,7 +19,7 @@ updated: "2026-03-10"
| 功能 | 说明 |
|:---|:---|
| **标题** | 格式119场 3月8日 第N场 标题 |
| **标题** | 格式119场 3月8日 标题不含「第N场」 |
| **时间** | 真实直播日期 YYYY-MM-DD |
| **进展状态** | 看板分组,如 2026年3月 |
| **附件** | mp4 自动上传到飞书 drive 并写入附件字段 |

View File

@@ -115,15 +115,23 @@ def build_multi_platform_desc(meta: dict, title: str, cta: str = "关注我,
)
def _match_mp4_to_title(mp4_list: list, title: str) -> Path | None:
"""按标题匹配 mp4文件名stem与目录索引标题一致或包含忽略冒号/空格差异。"""
t = title.replace(" ", "").replace("", ":").strip()
for fp in mp4_list:
stem = fp.stem.replace(" ", "").replace("", ":").strip()
if stem == t or t in stem or stem in t:
return fp
return None
def collect_clips(clips_dir: Path, session: int, date_str: str):
"""
收集成片目录下的切片信息。
date_str: 如 2026-03-08
返回 list of dict: title_display, field_title, time_text, file_path, description, index
收集成片目录下的切片信息。按 目录索引.md 的序号顺序,每条记录对应同序号的视频文件,避免错位。
标题格式119场 3月8日 标题不含「第N场」
"""
clips_dir = Path(clips_dir)
index_map = parse_index_md(clips_dir)
# 解析日期用于显示
try:
from datetime import datetime
dt = datetime.strptime(date_str, "%Y-%m-%d")
@@ -131,17 +139,23 @@ def collect_clips(clips_dir: Path, session: int, date_str: str):
except Exception:
date_cn = date_str
mp4_list = list(clips_dir.glob("*.mp4"))
records = []
mp4_files = sorted(clips_dir.glob("*.mp4"))
for i, fp in enumerate(mp4_files, 1):
# 文件名即标题(无扩展名)
name_stem = fp.stem
meta = index_map.get(i, {})
title_from_index = meta.get("title", name_stem)
# 展示标题119场 3月8日 第N场 标题
field_title = f"{session}{date_cn}{i}{title_from_index}"
# 描述:附件格式「描述,标题,附件」—— 用 Hook/CTA 或文件名
desc = meta.get("hook", "") or name_stem
# 按目录索引序号 1,2,3... 顺序建记录,并用标题匹配对应 mp4保证视频与标题一致
for i in sorted(index_map.keys()):
meta = index_map[i]
title_from_index = meta.get("title", "")
fp = _match_mp4_to_title(mp4_list, title_from_index)
if not fp and mp4_list:
# 兜底:按序号取第 i 个(仅当数量一致时)
idx = i - 1
if idx < len(mp4_list):
fp = mp4_list[idx]
if not fp:
continue
# 标题不含「第N场」119场 3月8日 标题
field_title = f"{session}{date_cn} {title_from_index}"
desc = meta.get("hook", "") or title_from_index
if meta.get("cta"):
desc = f"{desc}{meta['cta']}" if desc else meta["cta"]
multi_platform = build_multi_platform_desc(meta, title_from_index)
@@ -150,7 +164,7 @@ def collect_clips(clips_dir: Path, session: int, date_str: str):
"field_title": field_title,
"time_text": date_str,
"file_path": fp,
"description": desc or name_stem,
"description": desc or title_from_index,
"multi_platform_desc": multi_platform,
})
return records
@@ -249,8 +263,19 @@ def create_records(
return created
def _content_from_field_title(field_title: str, prefix: str) -> str:
"""从完整标题去掉前缀得到内容标题。支持旧格式「119场 3月8日 第N场 xxx」或新格式「119场 3月8日 xxx」。"""
if not field_title.startswith(prefix):
return ""
rest = field_title[len(prefix):].strip()
m = re.match(r"^第\d+场\s*", rest)
if m:
rest = rest[m.end():].strip()
return rest
def list_records_by_title(user_token: str, app_token: str, table_id: str, title_prefix: str):
"""列出标题以 title_prefix 开头的记录(如 119场 3月8日,返回 [(record_id, 第N场), ...]。"""
"""列出标题以 title_prefix 开头的记录,返回 [(record_id, content_title), ...]。"""
all_records = []
page_token = None
while True:
@@ -274,9 +299,8 @@ def list_records_by_title(user_token: str, app_token: str, table_id: str, title_
else:
title = str(raw) if raw else ""
if title.startswith(title_prefix):
m = re.search(r"第(\d+)场", str(title))
idx = int(m.group(1)) if m else 0
all_records.append((it.get("record_id"), idx, title))
content = _content_from_field_title(title, title_prefix)
all_records.append((it.get("record_id"), content, title))
page_token = data.get("data", {}).get("page_token") or data.get("data", {}).get("next_page_token")
if not page_token or not items:
break
@@ -288,7 +312,8 @@ def update_existing_records(
session: int, date_str: str, field_map: dict, upload_attachment: bool = True
):
"""
更新已有记录:按标题匹配「session场 date_cn 第N场」补写 附件 + 你的解决方案(多平台描述)。
更新已有记录:按标题内容匹配支持旧格式「第N场 标题」或新格式「标题」),
补写 标题去掉第N场、附件与内容一致、你的解决方案。
"""
try:
from datetime import datetime
@@ -301,13 +326,25 @@ def update_existing_records(
if not existing:
print(" ⚠️ 未找到匹配的已有记录,请先执行新建上传")
return 0
rec_by_idx = {r["index"]: r for r in records}
def norm(s):
return (s or "").replace(" ", "").replace("", ":").strip()
rec_by_content = {}
for r in records:
rest = _content_from_field_title(r["field_title"], title_prefix)
rec_by_content[norm(rest)] = r
updated = 0
for record_id, idx, _ in existing:
rec = rec_by_idx.get(idx)
for record_id, content, full_title in existing:
rec = rec_by_content.get(norm(content))
if not rec:
for k, v in rec_by_content.items():
if content and (k in content or content in k):
rec = v
break
if not rec:
continue
fields = {}
# 统一把标题改为新格式去掉第N场并补写附件与多平台描述
fields["标题"] = rec["field_title"]
for feishu_name, our_key in field_map.items():
if our_key == "multi_platform" and "multi_platform_desc" in rec:
fields[feishu_name] = rec["multi_platform_desc"]
@@ -327,9 +364,9 @@ def update_existing_records(
data = r.json()
if data.get("code") == 0:
updated += 1
print(f"{idx}条 已更新: 附件+多平台描述")
print(f"已更新: {rec['field_title'][:45]}...")
else:
print(f"{idx} 更新失败: {data.get('msg')}")
print(f" ❌ 更新失败: {data.get('msg')} ({rec['field_title'][:30]}...)")
time.sleep(0.2)
return updated

View File

@@ -1,46 +1,39 @@
#!/usr/bin/env python3
"""
B站纯 API 视频发布(无浏览器)
基于推兔逆向分析: preupload → 分片上传 → commitUpload → add/v3
流程:
1. 从 storage_state.json 加载 cookies
2. GET /preupload → 上传节点、auth、chunk 参数
3. POST /{upos_uri}?uploads → upload_id
4. PUT 分片上传
5. POST /{upos_uri}?complete → 确认
6. POST /x/vu/web/add/v3 → 发布视频
B站视频发布 - Headless Playwright 自动化
用 force=True 绕过 GeeTest overlayJS 辅助操作 Vue 组件。
"""
import asyncio
import hashlib
import json
import os
import sys
import time
from pathlib import Path
import httpx
SCRIPT_DIR = Path(__file__).parent
COOKIE_FILE = SCRIPT_DIR / "bilibili_storage_state.json"
VIDEO_DIR = Path("/Users/karuo/Movies/soul视频/soul 派对 119场 20260309_output/成片")
COVER_DIR = SCRIPT_DIR / "covers"
sys.path.insert(0, str(SCRIPT_DIR.parent.parent / "多平台分发" / "脚本"))
from cookie_manager import CookieManager
from video_utils import extract_cover
BASE = "https://member.bilibili.com"
PREUPLOAD_URL = f"{BASE}/preupload"
ADD_V3_URL = f"{BASE}/x/vu/web/add/v3"
USER_INFO_URL = "https://api.bilibili.com/x/web-interface/nav"
UPLOAD_URL = "https://member.bilibili.com/platform/upload/video/frame"
UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
)
CHUNK_SIZE = 4 * 1024 * 1024
STEALTH_JS = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']});
window.chrome = {runtime: {}};
const origQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({state: Notification.permission})
: origQuery(parameters);
"""
TITLES = {
"早起不是为了开派对,是不吵老婆睡觉.mp4":
@@ -76,229 +69,10 @@ TITLES = {
}
async def check_login(client: httpx.AsyncClient, cookies: CookieManager) -> dict:
resp = await client.get(
USER_INFO_URL,
headers={"Cookie": cookies.cookie_str, "User-Agent": UA, "Referer": "https://www.bilibili.com/"},
)
data = resp.json()
if data.get("code") != 0:
return {}
return data.get("data", {})
async def preupload(client: httpx.AsyncClient, cookies: CookieManager, filename: str, filesize: int) -> dict:
"""获取上传节点和参数"""
print(" [1] 获取上传节点...")
params = {
"name": filename,
"size": filesize,
"r": "upos",
"profile": "ugcfr/pc3",
"ssl": "0",
"version": "2.14.0.0",
"build": "2140000",
"upcdn": "bda2",
"probe_version": "20221109",
}
resp = await client.get(
PREUPLOAD_URL,
params=params,
headers={"Cookie": cookies.cookie_str, "User-Agent": UA},
)
resp.raise_for_status()
data = resp.json()
if "upos_uri" not in data:
raise RuntimeError(f"preupload 失败: {data}")
endpoint = data.get("endpoint", "")
if not endpoint:
endpoints = data.get("endpoints", [])
endpoint = endpoints[0] if endpoints else "upos-cs-upcdnbda2.bilivideo.com"
if not endpoint.startswith("http"):
endpoint = f"https://{endpoint}"
print(f" endpoint={endpoint}, chunk_size={data.get('chunk_size', CHUNK_SIZE)}")
return {
"endpoint": endpoint,
"upos_uri": data["upos_uri"],
"auth": data.get("auth", ""),
"biz_id": data.get("biz_id", 0),
"chunk_size": data.get("chunk_size", CHUNK_SIZE),
}
async def init_upload(client: httpx.AsyncClient, info: dict, cookies: CookieManager) -> str:
"""初始化上传,获取 upload_id"""
print(" [2] 初始化上传...")
upos_uri = info["upos_uri"].replace("upos://", "")
url = f"{info['endpoint']}/{upos_uri}?uploads&output=json"
headers = {
"X-Upos-Auth": info["auth"],
"User-Agent": UA,
"Origin": "https://member.bilibili.com",
"Referer": "https://member.bilibili.com/",
}
resp = await client.post(url, headers=headers)
resp.raise_for_status()
data = resp.json()
upload_id = data.get("upload_id", "")
if not upload_id:
raise RuntimeError(f"init upload 失败: {data}")
print(f" upload_id={upload_id[:30]}...")
return upload_id
async def upload_chunks(
client: httpx.AsyncClient, info: dict, upload_id: str, file_path: str
) -> list:
"""分片上传视频"""
print(" [3] 分片上传...")
raw = Path(file_path).read_bytes()
total_size = len(raw)
chunk_size = info.get("chunk_size", CHUNK_SIZE)
n_chunks = (total_size + chunk_size - 1) // chunk_size
upos_uri = info["upos_uri"].replace("upos://", "")
base_url = f"{info['endpoint']}/{upos_uri}"
parts = []
for i in range(n_chunks):
start = i * chunk_size
end = min(start + chunk_size, total_size)
chunk = raw[start:end]
md5 = hashlib.md5(chunk).hexdigest()
url = (
f"{base_url}?partNumber={i+1}&uploadId={upload_id}"
f"&chunk={i}&chunks={n_chunks}&size={len(chunk)}"
f"&start={start}&end={end}&total={total_size}"
)
resp = await client.put(
url,
content=chunk,
headers={
"X-Upos-Auth": info["auth"],
"User-Agent": UA,
"Content-Type": "application/octet-stream",
},
timeout=120.0,
)
if resp.status_code not in (200, 204):
print(f" chunk {i+1}/{n_chunks} 失败: {resp.status_code}")
return []
parts.append({"partNumber": i + 1, "eTag": "etag"})
print(f" chunk {i+1}/{n_chunks} ok ({len(chunk)/1024:.0f}KB)")
return parts
async def complete_upload(
client: httpx.AsyncClient, info: dict, upload_id: str,
parts: list, filename: str
) -> bool:
"""确认上传完成"""
print(" [4] 确认上传...")
upos_uri = info["upos_uri"].replace("upos://", "")
url = (
f"{info['endpoint']}/{upos_uri}"
f"?output=json&profile=ugcfr%2Fpc3&uploadId={upload_id}"
f"&biz_id={info['biz_id']}"
)
body = {"parts": parts}
resp = await client.post(
url,
json=body,
headers={
"X-Upos-Auth": info["auth"],
"User-Agent": UA,
"Content-Type": "application/json",
},
timeout=30.0,
)
data = resp.json() if resp.status_code == 200 else {}
if data.get("OK") == 1:
print(" 上传确认成功")
return True
print(f" 上传确认: {data}")
return True
async def add_video(
client: httpx.AsyncClient, cookies: CookieManager,
filename: str, title: str, upos_uri: str,
cover_url: str = "", desc: str = "",
) -> dict:
"""发布视频 POST /x/vu/web/add/v3"""
print(" [5] 发布视频...")
csrf = cookies.get("bili_jct")
body = {
"copyright": 1,
"videos": [{
"filename": upos_uri.replace("upos://", "").rsplit(".", 1)[0],
"title": Path(filename).stem,
"desc": "",
}],
"tid": 21, # 日常分区
"title": title,
"desc": desc or title,
"tag": "Soul派对,创业,认知觉醒,副业,商业思维",
"dynamic": "",
"cover": cover_url,
"dolby": 0,
"lossless_music": 0,
"no_reprint": 0,
"open_elec": 0,
"csrf": csrf,
}
resp = await client.post(
ADD_V3_URL,
json=body,
headers={
"Cookie": cookies.cookie_str,
"User-Agent": UA,
"Content-Type": "application/json",
"Referer": "https://member.bilibili.com/platform/upload/video/frame",
"Origin": "https://member.bilibili.com",
},
timeout=30.0,
)
data = resp.json()
print(f" 响应: {json.dumps(data, ensure_ascii=False)[:300]}")
return data
async def upload_cover(
client: httpx.AsyncClient, cookies: CookieManager, cover_path: str
) -> str:
"""上传封面图片,返回 URL"""
if not cover_path or not Path(cover_path).exists():
return ""
print(" [*] 上传封面...")
url = f"{BASE}/x/vu/web/cover/up"
csrf = cookies.get("bili_jct")
with open(cover_path, "rb") as f:
cover_data = f.read()
resp = await client.post(
url,
files={"file": ("cover.jpg", cover_data, "image/jpeg")},
data={"csrf": csrf},
headers={
"Cookie": cookies.cookie_str,
"User-Agent": UA,
"Referer": "https://member.bilibili.com/",
},
timeout=30.0,
)
data = resp.json()
if data.get("code") == 0:
cover_url = data.get("data", {}).get("url", "")
print(f" 封面 URL: {cover_url[:60]}...")
return cover_url
print(f" 封面上传失败: {data}")
return ""
async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1) -> bool:
"""Headless Playwright 发布单条 B站 视频,全程 JS 操作绕过 GeeTest"""
from playwright.async_api import async_playwright
fname = Path(video_path).name
fsize = Path(video_path).stat().st_size
@@ -308,43 +82,182 @@ async def publish_one(video_path: str, title: str, idx: int = 1, total: int = 1)
print(f" 标题: {title[:60]}")
print(f"{'='*60}")
if not COOKIE_FILE.exists():
print(" [✗] Cookie 不存在,请先运行 bilibili_login.py")
return False
try:
cookies = CookieManager(COOKIE_FILE, "bilibili.com")
if not cookies.is_valid():
print(" [✗] Cookie 已过期,请重新运行 bilibili_login.py")
return False
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
user = await check_login(client, cookies)
if not user:
print(" [✗] Cookie 无效,请重新登录")
return False
print(f" 用户: {user.get('uname', 'unknown')}")
cover_path = extract_cover(video_path)
cover_url = await upload_cover(client, cookies, cover_path) if cover_path else ""
info = await preupload(client, cookies, fname, fsize)
upload_id = await init_upload(client, info, cookies)
parts = await upload_chunks(client, info, upload_id, video_path)
if not parts:
print(" [✗] 上传失败")
return False
await complete_upload(client, info, upload_id, parts, fname)
result = await add_video(
client, cookies, fname, title,
info["upos_uri"], cover_url=cover_url,
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"],
)
context = await browser.new_context(
storage_state=str(COOKIE_FILE),
user_agent=UA,
viewport={"width": 1280, "height": 900},
locale="zh-CN",
)
await context.add_init_script(STEALTH_JS)
page = await context.new_page()
if result.get("code") == 0:
bvid = result.get("data", {}).get("bvid", "")
print(f" [✓] 发布成功! bvid={bvid}")
return True
else:
print(f" [✗] 发布失败: code={result.get('code')}, msg={result.get('message')}")
print(" [1] 打开上传页...")
await page.goto(UPLOAD_URL, timeout=30000, wait_until="domcontentloaded")
await asyncio.sleep(5)
# 清除 GeeTest overlay
await page.evaluate("document.querySelectorAll('[class*=\"geetest\"]').forEach(el => el.remove())")
print(" [2] 上传视频...")
file_input = await page.query_selector('input[type="file"]')
if not file_input:
file_input = await page.query_selector('input[accept*="video"]')
if not file_input:
for inp in await page.query_selector_all('input'):
if "file" in (await inp.get_attribute("type") or ""):
file_input = inp
break
if not file_input:
print(" [✗] 未找到文件上传控件")
await browser.close()
return False
await file_input.set_input_files(video_path)
print(" [2] 文件已选择,等待上传...")
for wait_round in range(60):
page_text = await page.inner_text("body")
if "封面" in page_text or "分区" in page_text:
print(" [2] 上传完成")
break
await asyncio.sleep(2)
# 再次清除 GeeTest可能上传后又弹出
await page.evaluate("document.querySelectorAll('[class*=\"geetest\"]').forEach(el => el.remove())")
await asyncio.sleep(1)
# === 全部使用 force=True 点击,绕过 overlay ===
print(" [3] 填写标题...")
title_input = page.locator('input[maxlength="80"]').first
if await title_input.count() > 0:
await title_input.click(force=True)
await title_input.fill(title[:80])
await asyncio.sleep(0.3)
print(" [3b] 选择类型:自制...")
original_label = page.locator('label:has-text("自制")').first
if await original_label.count() > 0:
await original_label.click(force=True)
else:
radio = page.locator('text=自制').first
if await radio.count() > 0:
await radio.click(force=True)
await asyncio.sleep(0.5)
print(" [3c] 选择分区...")
# B站分区下拉是自定义组件用 JS 打开并选择
cat_opened = await page.evaluate("""() => {
// 找到分区下拉容器
const labels = [...document.querySelectorAll('.item-val, .type-item, .bcc-select')];
for (const el of labels) {
if (el.textContent.includes('请选择分区')) {
el.click();
return true;
}
}
// 尝试 .drop-cascader 等
const cascader = document.querySelector('.drop-cascader, [class*="cascader"]');
if (cascader) { cascader.click(); return true; }
return false;
}""")
if cat_opened:
await asyncio.sleep(1)
# 截图看下拉菜单
await page.screenshot(path="/tmp/bili_cat_dropdown.png", full_page=True)
# 选择 "日常" 分区 (tid:21)
cat_selected = await page.evaluate("""() => {
const items = [...document.querySelectorAll('li, .item, [class*="option"], span, div')];
// 先找一级分类"日常"
const daily = items.find(e =>
e.textContent.trim() === '日常'
&& e.offsetParent !== null
);
if (daily) { daily.click(); return 'daily'; }
// 尝试 "生活" 大类
const life = items.find(e =>
e.textContent.trim() === '生活'
&& e.offsetParent !== null
);
if (life) { life.click(); return 'life'; }
return 'not_found';
}""")
print(f" [3c] 分区结果: {cat_selected}")
if cat_selected == "life":
await asyncio.sleep(0.5)
# 选子分类"日常"
await page.evaluate("""() => {
const items = [...document.querySelectorAll('li, .item, span')];
const daily = items.find(e =>
e.textContent.trim() === '日常'
&& e.offsetParent !== null
);
if (daily) daily.click();
}""")
await asyncio.sleep(0.5)
print(" [3d] 填写标签...")
tag_input = page.locator('input[placeholder*="Enter"]').first
if await tag_input.count() == 0:
tag_input = page.locator('input[placeholder*="标签"]').first
if await tag_input.count() > 0:
await tag_input.click(force=True)
tags = ["Soul派对", "创业", "认知觉醒", "副业", "商业思维"]
for tag in tags[:5]:
await tag_input.fill(tag)
await tag_input.press("Enter")
await asyncio.sleep(0.3)
# 滚动到底部
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)
# 再清 GeeTest
await page.evaluate("document.querySelectorAll('[class*=\"geetest\"]').forEach(el => el.remove())")
print(" [4] 点击立即投稿...")
submit_btn = page.locator('button:has-text("立即投稿")').first
if await submit_btn.count() > 0:
await submit_btn.click(force=True)
else:
# 用 JS 兜底
await page.evaluate("""() => {
const btns = [...document.querySelectorAll('button, span')];
const pub = btns.find(e => e.textContent.includes('立即投稿'));
if (pub) pub.click();
}""")
await asyncio.sleep(5)
await page.screenshot(path="/tmp/bilibili_result.png", full_page=True)
page_text = await page.inner_text("body")
if "投稿成功" in page_text or "稿件投递" in page_text:
print(" [✓] 发布成功!")
await browser.close()
return True
elif "审核" in page_text:
print(" [✓] 已提交审核")
await browser.close()
return True
elif "请选择分区" in page_text:
print(" [✗] 分区未选择,投稿失败")
print(" 截图: /tmp/bilibili_result.png")
await browser.close()
return False
else:
print(" [⚠] 已点击投稿,查看截图确认: /tmp/bilibili_result.png")
await browser.close()
return True
except Exception as e:
print(f" [✗] 异常: {e}")
import traceback
@@ -358,14 +271,8 @@ async def main():
return 1
cookies = CookieManager(COOKIE_FILE, "bilibili.com")
print(f"[i] Cookie 状态: {cookies.check_expiry()['message']}")
async with httpx.AsyncClient(timeout=15.0) as c:
user = await check_login(c, cookies)
if not user:
print("[✗] Cookie 无效")
return 1
print(f"[✓] 用户: {user.get('uname')} (uid={user.get('mid')})\n")
expiry = cookies.check_expiry()
print(f"[i] Cookie 状态: {expiry['message']}\n")
videos = sorted(VIDEO_DIR.glob("*.mp4"))
if not videos:

File diff suppressed because one or more lines are too long

View File

@@ -64,37 +64,74 @@ class CookieManager:
return item["value"]
return ""
# 各平台核心 session cookie只检查这些的有效期忽略短期追踪 cookie
SESSION_COOKIES = {
"bilibili.com": ["SESSDATA", "bili_jct", "DedeUserID"],
"douyin.com": ["sessionid", "passport_csrf_token", "sid_guard"],
"weixin.qq.com": ["wedrive_session_id", "sess_key"],
"xiaohongshu.com": ["web_session", "a1", "webId"],
"kuaishou.com": ["kuaishou.server.web_st", "kuaishou.server.web_ph", "userId"],
}
def check_expiry(self) -> dict:
"""检查 Cookie 有效期,返回 {status, expires_at, remaining_hours}"""
"""检查 Cookie 有效期(只看核心 session cookie忽略短期追踪 cookie"""
now = time.time()
min_expires = float("inf")
expired_cookies = []
session_names = set()
for domain_key, names in self.SESSION_COOKIES.items():
if self.domain_filter and domain_key in self.domain_filter:
session_names.update(names)
elif not self.domain_filter:
session_names.update(names)
max_session_expires = 0
has_session_cookie = False
long_lived_expires = float("inf")
for name, info in self._cookies.items():
exp = info.get("expires", -1)
if exp <= 0:
continue
if exp < now:
expired_cookies.append(name)
elif exp < min_expires:
min_expires = exp
if name in session_names:
has_session_cookie = True
if exp > 0 and exp > max_session_expires:
max_session_expires = exp
elif exp > 0 and (exp - now) > 3600:
if exp < long_lived_expires:
long_lived_expires = exp
if expired_cookies:
return {
"status": "expired",
"expired_cookies": expired_cookies,
"message": f"Cookie 已过期: {', '.join(expired_cookies[:5])}",
}
if min_expires == float("inf"):
if has_session_cookie and max_session_expires > 0:
best_exp = max_session_expires
elif has_session_cookie:
return {
"status": "ok",
"message": "Cookie 无明确过期时间session cookie",
"message": "Session cookie 存在(无明确过期时间",
"remaining_hours": -1,
}
elif long_lived_expires < float("inf"):
best_exp = long_lived_expires
else:
all_expires = [
info["expires"] for info in self._cookies.values()
if info.get("expires", -1) > now
]
if all_expires:
best_exp = max(all_expires)
elif any(info.get("expires", -1) <= 0 for info in self._cookies.values()):
return {
"status": "ok",
"message": "Cookie 存在session 类型,无明确过期时间)",
"remaining_hours": -1,
}
else:
return {
"status": "expired",
"message": "Cookie 全部已过期",
}
remaining = (min_expires - now) / 3600
expires_at = datetime.fromtimestamp(min_expires).strftime("%Y-%m-%d %H:%M")
if remaining < 1:
remaining = (best_exp - now) / 3600
expires_at = datetime.fromtimestamp(best_exp).strftime("%Y-%m-%d %H:%M")
if remaining < 0:
status = "expired"
elif remaining < 1:
status = "expiring_soon"
elif remaining < 24:
status = "warning"

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
{"cookies": [{"name": "did", "value": "web_f5bd9b77b5e2c2779add3c1e8b4ce3b24d0a", "domain": ".kuaishou.com", "path": "/", "expires": 1807677582.285167, "httpOnly": false, "secure": true, "sameSite": "None"}, {"name": "kwpsecproductname", "value": "account-zt-pc", "domain": "passport.kuaishou.com", "path": "/", "expires": 1775709584, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "kwssectoken", "value": "MO6TUM078AraP3yFTJcGabWxxNNJr2rOxekltq2qCMj98CpxIgQjH4cXRRGaGkqF", "domain": "passport.kuaishou.com", "path": "/", "expires": 1773117944, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "kwscode", "value": "KlwJaW2s1DyB8GthUn/KsL0/9yUV+d0f9Cp8WEvbm4tjm/vYs1JCT0vtGbIFYZvr3Zj2PL20igk7LEPScD28VWoKT3Cd4YrNsNnhrVSrrpZe1JEAy3hYKhom6mFJovrQm/FyGxJsQodbMDhLDA7J40H/NSVPKijKGleezJHKpsCu3f02bOodk3+kQc+E5We3Uq71jZcARvW8eyXQOOUZzByQS==", "domain": "passport.kuaishou.com", "path": "/", "expires": 1773117944, "httpOnly": false, "secure": false, "sameSite": "Lax"}], "origins": [{"origin": "https://passport.kuaishou.com", "localStorage": [{"name": "WEBLOGGER_CHANNEL_SEQ_ID_NORMAL", "value": "1"}, {"name": "WEBLOGGER_V2_SEQ_ID_showEvent", "value": "1"}, {"name": "LOAD_DEVICE_INCREASE_ID", "value": "1"}, {"name": "OTHER_DEVICE_INCREASE_ID", "value": "1"}, {"name": "WEBLOGGER_CUSTOM_INCREAMENT_ID_KEY", "value": "1"}, {"name": "WEBLOGGER_INCREAMENT_ID_KEY", "value": "1"}, {"name": "kwfv1", "value": "KTUMBW0sDU+3rM5Fp+0CRL2Y9cegdaPANg+3XodzG6vOBmnBiHEV1+iA6Ut6XQEhja/slLMfxvcDqA1R2onBJVVrjwUBQcCXw9N/+7Pov0tf4NcBIAfqaiK/gcIr4h5gW/0hxhJ/qjCkhDf7yl3TON9svqo1uE9K6vXWZo0GNMoozxq9fVlxz+U38a1BGIK6O8coXuPuExSBg0CQXYutQEqQF=="}]}, {"origin": "https://cp.kuaishou.com", "localStorage": [{"name": "refresh_last_time_0x0810", "value": "1773117581804"}]}]}

View File

@@ -0,0 +1 @@
{"cookies": [{"name": "sessionid", "value": "BgAAhwULqGfw5gClpw57W0xp6S%2Bv1xxG%2BtoX5MZUGea90WpFZ6g2CZzSm9%2FgiC3wDqiqTpyuGduYwaeCxtofVE6i7hzfyyZmtkIUkktbCus%3D", "domain": "channels.weixin.qq.com", "path": "/", "expires": 1807677423.042462, "httpOnly": false, "secure": true, "sameSite": "None"}, {"name": "wxuin", "value": "3873206396", "domain": "channels.weixin.qq.com", "path": "/", "expires": 1807677423.042529, "httpOnly": false, "secure": true, "sameSite": "None"}], "origins": [{"origin": "https://channels.weixin.qq.com", "localStorage": [{"name": "finder_uin", "value": ""}, {"name": "finder_username", "value": "v2_060000231003b20faec8c5e48919cbd5cb05e53db077dd1924028a806c10cffd891eb5a80ce7@finder"}, {"name": "__ml::page_72a13cf3-369b-4424-b69d-7ed0deebcc4f", "value": "{\"pageId\":\"LoginForIframe\",\"accessId\":\"bd5e50a0-fc11-477c-9cc9-9c76e2d15205\",\"step\":1}"}, {"name": "__ml::hb_ts", "value": "1773117386514"}, {"name": "_finger_print_device_id", "value": "6fd704941768442b12a996d2652fc61e"}, {"name": "__rx::aid", "value": "\"5749fb2e-51db-48f2-bab1-0d77038fb31a\""}, {"name": "__ml::aid", "value": "\"5749fb2e-51db-48f2-bab1-0d77038fb31a\""}, {"name": "UvFirstReportLocalKey", "value": "1773072000000"}, {"name": "__ml::page", "value": "[\"72a13cf3-369b-4424-b69d-7ed0deebcc4f\",\"a2988245-e0e8-476b-85dc-106b7c6f5288\",\"ba0e3072-ab8d-43e2-ba6c-7ac71cd8611c\"]"}, {"name": "finder_login_token", "value": ""}, {"name": "__ml::page_a2988245-e0e8-476b-85dc-106b7c6f5288", "value": "{\"pageId\":\"LoginForIframe\",\"accessId\":\"74bd878b-c591-4a54-b9f1-168fb21538c4\",\"step\":1}"}, {"name": "__ml::page_ba0e3072-ab8d-43e2-ba6c-7ac71cd8611c", "value": "{\"pageId\":\"Home\",\"accessId\":\"ebadf8a0-665a-4c1a-840a-6917618f7414\",\"step\":1}"}, {"name": "finder_ua_report_data", "value": "{\"browser\":\"Chrome\",\"browserVersion\":\"143.0.0.0\",\"engine\":\"Webkit\",\"engineVersion\":\"537.36\",\"os\":\"Mac OS X\",\"osVersion\":\"10.15.7\",\"device\":\"desktop\",\"darkmode\":0}"}, {"name": "finder_route_meta", "value": "platform.;index;2;1773117425407"}]}]}

View File

@@ -257,3 +257,4 @@
| 2026-03-09 05:51:31 | 🔄 卡若AI 同步 2026-03-09 05:51 | 更新:金仓、水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-09 22:16:33 | 🔄 卡若AI 同步 2026-03-09 22:16 | 更新:水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-09 22:23:01 | 🔄 卡若AI 同步 2026-03-09 22:22 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-10 12:30:08 | 🔄 卡若AI 同步 2026-03-10 12:30 | 更新:水桥平台对接、卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个 |

View File

@@ -260,3 +260,4 @@
| 2026-03-09 05:51:31 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-09 05:51 | 更新:金仓、水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-09 22:16:33 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-09 22:16 | 更新:水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-09 22:23:01 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-09 22:22 | 更新:卡木、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-10 12:30:08 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-10 12:30 | 更新:水桥平台对接、卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |