🔄 卡若AI 同步 2026-02-22 10:22 | 更新:金仓、水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个

This commit is contained in:
2026-02-22 10:22:18 +08:00
parent 1bc7435c11
commit adc8caa68d
16 changed files with 1117 additions and 38 deletions

View File

@@ -9,7 +9,7 @@ services:
ports: ports:
- "8080:80" - "8080:80"
volumes: volumes:
- ./www:/var/www/html:ro - ./www:/var/www/html
restart: unless-stopped restart: unless-stopped
environment: environment:
- TZ=Asia/Shanghai - TZ=Asia/Shanghai

View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
存客宝 www.lytiao.com Docker 一键确保运行
1. TAT 全量部署 + 验证
2. 安全组放行 8080
3. 获取并打印服务器执行结果
"""
import base64
import json
import os
import re
import sys
import time
CKB_INSTANCE_ID = "ins-ciyv2mxa"
REGION = "ap-guangzhou"
def _find_root():
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(6):
if os.path.basename(d) == "卡若AI" or (os.path.isdir(os.path.join(d, "运营中枢")) and os.path.isdir(os.path.join(d, "01_卡资"))):
return d
d = os.path.dirname(d)
return None
def _read_creds():
root = _find_root()
if not root:
return None, None
path = os.path.join(root, "运营中枢", "工作台", "00_账号与API索引.md")
if not os.path.isfile(path):
return None, None
with open(path, "r", encoding="utf-8") as f:
text = f.read()
sid = skey = None
in_t = False
for line in text.splitlines():
if "### 腾讯云" in line:
in_t = True
continue
if in_t and line.strip().startswith("###"):
break
if not in_t:
continue
m = re.search(r"\|\s*[^|]*(?:SecretId|密钥)[^|]*\|\s*`([^`]+)`", line, re.I)
if m and m.group(1).strip().startswith("AKID"):
sid = m.group(1).strip()
m = re.search(r"\|\s*SecretKey\s*\|\s*`([^`]+)`", line, re.I)
if m:
skey = m.group(1).strip()
return sid or os.environ.get("TENCENTCLOUD_SECRET_ID"), skey or os.environ.get("TENCENTCLOUD_SECRET_KEY")
# 完整部署:创建配置、复制网站、构建、启动、验证
CMD = """set -e
SRC="/www/wwwroot/www.lytiao.com"
DIR="/opt/lytiao_docker"
mkdir -p "$DIR"
echo ">>> 1. 创建 Docker 配置"
cat > "$DIR/Dockerfile" << 'DFEND'
FROM php:7.1-apache
RUN a2enmod rewrite
RUN apt-get update && apt-get install -y libpng-dev libjpeg-dev libzip-dev zip unzip \\
&& docker-php-ext-configure gd --with-png-dir=/usr --with-jpeg-dir=/usr \\
&& docker-php-ext-install -j$(nproc) gd mysqli pdo pdo_mysql zip \\
&& apt-get clean && rm -rf /var/lib/apt/lists/*
WORKDIR /var/www/html
EXPOSE 80
DFEND
cat > "$DIR/docker-compose.yml" << 'DCEND'
version: "3.8"
services:
lytiao-web:
build: .
container_name: lytiao-www
ports:
- "8090:80"
volumes:
- ./www:/var/www/html:ro
restart: unless-stopped
environment:
- TZ=Asia/Shanghai
DCEND
echo ">>> 2. 复制网站文件"
rm -rf "$DIR/www"
cp -a "$SRC" "$DIR/www"
echo ">>> 3. 构建并启动"
cd "$DIR"
docker compose down 2>/dev/null || true
docker compose up -d --build
sleep 5
echo ">>> 4. 验证容器状态"
docker ps -a --filter "name=lytiao"
echo ""
echo ">>> 5. 本机访问测试"
curl -sI -o /dev/null -w "HTTP状态: %{http_code}\\n" --connect-timeout 5 http://127.0.0.1:8090/ || echo "curl失败"
echo ""
echo "DONE"
"""
def main():
secret_id, secret_key = _read_creds()
if not secret_id or not secret_key:
print("❌ 未配置腾讯云凭证")
return 1
try:
from tencentcloud.common import credential
from tencentcloud.tat.v20201028 import tat_client, models
except ImportError:
print("pip install tencentcloud-sdk-python-common tencentcloud-sdk-python-tat")
return 1
# 1. 安全组放行 8080
print("========== 1. 安全组放行 8090 ==========")
try:
from tencentcloud.common import credential
from tencentcloud.cvm.v20170312 import cvm_client, models as cvm_models
from tencentcloud.vpc.v20170312 import vpc_client, models as vpc_models
cred = credential.Credential(secret_id, secret_key)
sg_ids, region = [], None
for r in ["ap-guangzhou", "ap-beijing", "ap-shanghai"]:
try:
c = cvm_client.CvmClient(cred, r)
req = cvm_models.DescribeInstancesRequest()
req.Limit = 100
resp = c.DescribeInstances(req)
for ins in (getattr(resp, "InstanceSet", None) or []):
if "42.194.245.239" in list(getattr(ins, "PublicIpAddresses", None) or []):
sg_ids = list(getattr(ins, "SecurityGroupIds", None) or [])
region = r
break
except Exception:
continue
if sg_ids:
break
if sg_ids and region:
vc = vpc_client.VpcClient(cred, region)
for sg_id in sg_ids:
try:
req = vpc_models.CreateSecurityGroupPoliciesRequest()
req.SecurityGroupId = sg_id
ps = vpc_models.SecurityGroupPolicySet()
ing = vpc_models.SecurityGroupPolicy()
ing.Protocol, ing.Port, ing.CidrBlock = "TCP", "8090", "0.0.0.0/0"
ing.Action, ing.PolicyDescription = "ACCEPT", "lytiao-Docker"
ps.Ingress = [ing]
req.SecurityGroupPolicySet = ps
vc.CreateSecurityGroupPolicies(req)
print("%s 已添加 8080/TCP" % sg_id)
except Exception as e:
if "RuleAlreadyExists" in str(e) or "已存在" in str(e):
print(" ⏭ 8080 规则已存在")
else:
print("", e)
else:
print(" 未找到存客宝实例,跳过放行")
except ImportError as e:
print(" 跳过(缺 vpc 包): pip install tencentcloud-sdk-python-vpc")
except Exception as e:
print(" 放行异常:", e)
# 2. TAT 部署
print("\n========== 2. TAT 部署 lytiao 容器 ==========")
cred = credential.Credential(secret_id, secret_key)
client = tat_client.TatClient(cred, REGION)
req = models.RunCommandRequest()
req.Content = base64.b64encode(CMD.encode()).decode()
req.InstanceIds = [CKB_INSTANCE_ID]
req.CommandType = "SHELL"
req.Timeout = 600
req.CommandName = "CKB_lytiao_Ensure"
resp = client.RunCommand(req)
inv_id = resp.InvocationId
print(" 已下发 InvocationId:", inv_id)
print(" 等待 90s 获取执行结果...")
time.sleep(90)
# 3. 获取输出
print("\n========== 3. 服务器执行结果 ==========")
try:
req2 = models.DescribeInvocationTasksRequest()
f = models.Filter()
f.Name = "invocation-id"
f.Values = [inv_id]
req2.Filters = [f]
resp2 = client.DescribeInvocationTasks(req2)
for t in (resp2.InvocationTaskSet or []):
status = getattr(t, "TaskStatus", "N/A")
exit_code = getattr(t, "ExitCode", None)
print(" 任务状态:", status, "退出码:", exit_code)
tr = getattr(t, "TaskResult", None)
if tr:
try:
if hasattr(tr, "get"):
j = tr
else:
j = json.loads(tr) if isinstance(tr, str) else vars(tr)
out = j.get("Output", "") if isinstance(j, dict) else getattr(tr, "Output", "")
if out:
try:
out = base64.b64decode(out).decode("utf-8", errors="replace")
except Exception:
pass
print(out[-4000:] if len(out) > 4000 else out)
err = j.get("Error", "") if isinstance(j, dict) else getattr(tr, "Error", "")
if err:
try:
err = base64.b64decode(err).decode("utf-8", errors="replace")
except Exception:
pass
print("--- 错误 ---\n", err[:3000])
# 打印完整 TaskResult 用于调试
if status == "FAILED" and not out and not err:
print("--- 原始 TaskResult ---\n", str(tr)[:1500])
except Exception as e:
print(" TaskResult:", type(tr), str(tr)[:500])
except Exception as e:
print(" 查询异常:", e)
print("\n========== 完成 ==========")
print(" 访问: http://42.194.245.239:8090 8080 被 frps 占用,已改用 8090")
print(" 宝塔 Docker 总览点击「刷新容器列表」可见 lytiao-www")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""仅启动 lytiao 容器(不重新构建),并返回详细输出"""
import base64, json, os, re, sys, time
CKB_ID, REGION = "ins-ciyv2mxa", "ap-guangzhou"
def _cred():
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(6):
r = os.path.join(d, "运营中枢", "工作台", "00_账号与API索引.md")
if os.path.isfile(r):
t = open(r, encoding="utf-8").read()
sid = skey = None
in_t = False
for L in t.splitlines():
if "### 腾讯云" in L: in_t = True
elif in_t and L.strip().startswith("###"): break
elif in_t:
m = re.search(r"SecretId[^|]*\|\s*`([^`]+)`", L, re.I)
if m and m.group(1).strip().startswith("AKID"): sid = m.group(1).strip()
m = re.search(r"SecretKey[^|]*\|\s*`([^`]+)`", L, re.I)
if m: skey = m.group(1).strip()
return sid or os.environ.get("TENCENTCLOUD_SECRET_ID"), skey or os.environ.get("TENCENTCLOUD_SECRET_KEY")
d = os.path.dirname(d)
return None, None
# 只启动,不构建;若镜像不存在会报错,便于定位
CMD = """cd /opt/lytiao_docker
sed -i 's/8080:80/8090:80/g' docker-compose.yml
docker compose down 2>/dev/null
docker compose up -d 2>&1
echo "---"
docker ps -a --filter name=lytiao
echo "---"
curl -sI -o /dev/null -w 'HTTP: %{http_code}' http://127.0.0.1:8090/ 2>/dev/null || echo "curl fail"
"""
def main():
sid, skey = _cred()
if not sid or not skey: print("❌ 无凭证"); return 1
from tencentcloud.common import credential
from tencentcloud.tat.v20201028 import tat_client, models
cred = credential.Credential(sid, skey)
client = tat_client.TatClient(cred, REGION)
req = models.RunCommandRequest()
req.Content = base64.b64encode(CMD.encode()).decode()
req.InstanceIds = [CKB_ID]
req.CommandType, req.Timeout = "SHELL", 120
req.CommandName = "CKB_lytiao_Start"
r = client.RunCommand(req)
print("已下发,等待 90s...")
time.sleep(90)
req2 = models.DescribeInvocationTasksRequest()
f = models.Filter(); f.Name, f.Values = "invocation-id", [r.InvocationId]
req2.Filters = [f]
r2 = client.DescribeInvocationTasks(req2)
for t in (r2.InvocationTaskSet or []):
print("状态:", getattr(t, "TaskStatus", ""))
tr = getattr(t, "TaskResult", None)
if tr:
try:
j = json.loads(tr) if isinstance(tr, str) else (tr if hasattr(tr, "get") else vars(tr))
o = j.get("Output", "") if isinstance(j, dict) else ""
e = j.get("Error", "") if isinstance(j, dict) else ""
if o:
try: o = base64.b64decode(o).decode("utf-8", errors="replace")
except: pass
print("\n--- 输出 ---\n", o)
if e:
try: e = base64.b64decode(e).decode("utf-8", errors="replace")
except: pass
print("\n--- 错误 ---\n", e)
except Exception as ex:
print("解析异常:", ex, "| raw:", str(tr)[:600])
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""快速检查 lytiao 容器状态,获取 TAT 输出"""
import base64, json, os, re, sys, time
CKB_ID, REGION = "ins-ciyv2mxa", "ap-guangzhou"
def _cred():
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(6):
r = os.path.join(d, "运营中枢", "工作台", "00_账号与API索引.md")
if os.path.isfile(r):
t = open(r, encoding="utf-8").read()
sid = skey = None
in_t = False
for L in t.splitlines():
if "### 腾讯云" in L: in_t = True
elif in_t and L.strip().startswith("###"): break
elif in_t:
m = re.search(r"SecretId[^|]*\|\s*`([^`]+)`", L, re.I)
if m and m.group(1).strip().startswith("AKID"): sid = m.group(1).strip()
m = re.search(r"SecretKey[^|]*\|\s*`([^`]+)`", L, re.I)
if m: skey = m.group(1).strip()
return sid or os.environ.get("TENCENTCLOUD_SECRET_ID"), skey or os.environ.get("TENCENTCLOUD_SECRET_KEY")
d = os.path.dirname(d)
return None, None
CMD = """cd /opt/lytiao_docker 2>/dev/null && docker compose ps -a
echo "---"
docker ps -a --filter name=lytiao
echo "---"
ss -tlnp | grep -E '8080|8090' || true
echo "---"
curl -sI -o /dev/null -w '8090: %{http_code}' http://127.0.0.1:8090/ 2>/dev/null || echo "8090 curl fail"
"""
def main():
sid, skey = _cred()
if not sid or not skey: print("❌ 无凭证"); return 1
from tencentcloud.common import credential
from tencentcloud.tat.v20201028 import tat_client, models
cred = credential.Credential(sid, skey)
client = tat_client.TatClient(cred, REGION)
req = models.RunCommandRequest()
req.Content = base64.b64encode(CMD.encode()).decode()
req.InstanceIds = [CKB_ID]
req.CommandType, req.Timeout = "SHELL", 30
req.CommandName = "CKB_lytiao_Check"
r = client.RunCommand(req)
inv = r.InvocationId
print("⏳ 等待 25s...")
time.sleep(25)
req2 = models.DescribeInvocationTasksRequest()
f = models.Filter(); f.Name, f.Values = "invocation-id", [inv]
req2.Filters = [f]
r2 = client.DescribeInvocationTasks(req2)
for t in (r2.InvocationTaskSet or []):
print("状态:", getattr(t, "TaskStatus", ""))
tr = getattr(t, "TaskResult", None)
if tr:
try:
j = tr if hasattr(tr, "get") else (json.loads(tr) if isinstance(tr, str) else vars(tr) if hasattr(tr, "__dict__") else {})
o = (j.get("Output", "") if isinstance(j, dict) else "") or getattr(tr, "Output", "")
if o:
try:
o = base64.b64decode(o).decode("utf-8", errors="replace")
except Exception:
pass
print("\n--- 服务器输出 ---\n", o)
except Exception as e:
print("解析:", e, "raw:", str(tr)[:300])
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
存客宝 lytiao 快速部署 - 使用精简 Dockerfile不编译扩展2 分钟内完成)
"""
import base64, json, os, re, sys, time
CKB_ID, REGION = "ins-ciyv2mxa", "ap-guangzhou"
def _cred():
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(6):
r = os.path.join(d, "运营中枢", "工作台", "00_账号与API索引.md")
if os.path.isfile(r):
t = open(r, encoding="utf-8").read()
sid = skey = None
in_t = False
for L in t.splitlines():
if "### 腾讯云" in L: in_t = True
elif in_t and L.strip().startswith("###"): break
elif in_t:
m = re.search(r"SecretId[^|]*\|\s*`([^`]+)`", L, re.I)
if m and m.group(1).strip().startswith("AKID"): sid = m.group(1).strip()
m = re.search(r"SecretKey[^|]*\|\s*`([^`]+)`", L, re.I)
if m: skey = m.group(1).strip()
return sid or os.environ.get("TENCENTCLOUD_SECRET_ID"), skey or os.environ.get("TENCENTCLOUD_SECRET_KEY")
d = os.path.dirname(d)
return None, None
# 精简 Dockerfile不编译扩展拉取即用
CMD = """set -e
DIR="/opt/lytiao_docker"
SRC="/www/wwwroot/www.lytiao.com"
mkdir -p "$DIR"
cat > "$DIR/Dockerfile" << 'EOF'
FROM php:7.1-apache
RUN a2enmod rewrite
WORKDIR /var/www/html
EXPOSE 80
EOF
cat > "$DIR/docker-compose.yml" << 'EOF'
services:
lytiao-web:
build: .
container_name: lytiao-www
ports:
- "8090:80"
volumes:
- ./www:/var/www/html
restart: unless-stopped
EOF
rm -rf "$DIR/www"
cp -a "$SRC" "$DIR/www"
cd "$DIR"
docker compose down 2>/dev/null || true
docker compose up -d --build 2>&1
sleep 3
docker ps -a --filter name=lytiao
curl -sI -o /dev/null -w 'HTTP: %{http_code}' http://127.0.0.1:8090/ 2>/dev/null || echo "fail"
"""
def main():
sid, skey = _cred()
if not sid or not skey: print("❌ 无凭证"); return 1
from tencentcloud.common import credential
from tencentcloud.tat.v20201028 import tat_client, models
cred = credential.Credential(sid, skey)
client = tat_client.TatClient(cred, REGION)
req = models.RunCommandRequest()
req.Content = base64.b64encode(CMD.encode()).decode()
req.InstanceIds = [CKB_ID]
req.CommandType, req.Timeout = "SHELL", 180
req.CommandName = "CKB_lytiao_Quick"
r = client.RunCommand(req)
print("已下发(精简构建,约 2 分钟InvocationId:", r.InvocationId)
print("等待 150s 获取结果...")
time.sleep(150)
req2 = models.DescribeInvocationTasksRequest()
f = models.Filter(); f.Name, f.Values = "invocation-id", [r.InvocationId]
req2.Filters = [f]
r2 = client.DescribeInvocationTasks(req2)
for t in (r2.InvocationTaskSet or []):
print("状态:", getattr(t, "TaskStatus", ""))
err_info = getattr(t, "ErrorInfo", None)
if err_info:
print("ErrorInfo:", err_info)
tr = getattr(t, "TaskResult", None)
print("TaskResult type:", type(tr))
if tr:
try:
j = json.loads(tr) if isinstance(tr, str) else (tr if hasattr(tr, "get") else {})
o = j.get("Output", "") if isinstance(j, dict) else ""
e = j.get("Error", "") if isinstance(j, dict) else ""
if o:
try: o = base64.b64decode(o).decode("utf-8", errors="replace")
except: pass
print("\n--- 输出 ---\n", o)
if e:
try: e = base64.b64decode(e).decode("utf-8", errors="replace")
except: pass
print("\n--- 错误 ---\n", e)
except Exception as ex:
print("解析异常:", ex)
print("TaskResult repr:", repr(tr)[:1200])
print("\n访问: http://42.194.245.239:8090")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
腾讯云 API 为存客宝放行 8090lytiao Docker 容器端口)
"""
import sys
import os
# 复用 443 脚本逻辑,仅端口改为 8090
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
_CKB_IP = "42.194.245.239"
_REGIONS = ["ap-guangzhou", "ap-beijing", "ap-shanghai"]
def _find_root():
d = _SCRIPT_DIR
for _ in range(6):
if os.path.basename(d) == "卡若AI" or (os.path.isdir(os.path.join(d, "运营中枢")) and os.path.isdir(os.path.join(d, "01_卡资"))):
return d
d = os.path.dirname(d)
return None
def _read_creds():
root = _find_root()
if not root:
return None, None
path = os.path.join(root, "运营中枢", "工作台", "00_账号与API索引.md")
if not os.path.isfile(path):
return None, None
import re
with open(path, "r", encoding="utf-8") as f:
text = f.read()
sid = skey = None
in_t = False
for line in text.splitlines():
if "### 腾讯云" in line:
in_t = True
continue
if in_t and line.strip().startswith("###"):
break
if not in_t:
continue
m = re.search(r"\|\s*[^|]*(?:SecretId|密钥)[^|]*\|\s*`([^`]+)`", line, re.I)
if m and m.group(1).strip().startswith("AKID"):
sid = m.group(1).strip()
m = re.search(r"\|\s*SecretKey\s*\|\s*`([^`]+)`", line, re.I)
if m:
skey = m.group(1).strip()
return sid or os.environ.get("TENCENTCLOUD_SECRET_ID"), skey or os.environ.get("TENCENTCLOUD_SECRET_KEY")
def main():
secret_id, secret_key = _read_creds()
if not secret_id or not secret_key:
print("❌ 未配置腾讯云 SecretId/SecretKey")
return 1
from tencentcloud.common import credential
from tencentcloud.cvm.v20170312 import cvm_client, models as cvm_models
from tencentcloud.vpc.v20170312 import vpc_client, models as vpc_models
cred = credential.Credential(secret_id, secret_key)
sg_ids, region = [], None
for r in _REGIONS:
try:
c = cvm_client.CvmClient(cred, r)
req = cvm_models.DescribeInstancesRequest()
req.Limit = 100
resp = c.DescribeInstances(req)
for ins in (getattr(resp, "InstanceSet", None) or []):
if _CKB_IP in list(getattr(ins, "PublicIpAddresses", None) or []):
sg_ids = list(getattr(ins, "SecurityGroupIds", None) or [])
region = r
break
except Exception:
continue
if sg_ids:
break
if not sg_ids or not region:
print("❌ 存客宝 %s 未找到" % _CKB_IP)
return 1
print(" 存客宝安全组放行 8090lytiao Docker")
vc = vpc_client.VpcClient(cred, region)
added = 0
for sg_id in sg_ids:
try:
req = vpc_models.CreateSecurityGroupPoliciesRequest()
req.SecurityGroupId = sg_id
policy_set = vpc_models.SecurityGroupPolicySet()
ing = vpc_models.SecurityGroupPolicy()
ing.Protocol = "TCP"
ing.Port = "8090"
ing.CidrBlock = "0.0.0.0/0"
ing.Action = "ACCEPT"
ing.PolicyDescription = "lytiao-Docker"
policy_set.Ingress = [ing]
req.SecurityGroupPolicySet = policy_set
vc.CreateSecurityGroupPolicies(req)
print("%s 已添加 8090/TCP" % sg_id)
added += 1
except Exception as e:
err = str(e)
if "RuleAlreadyExists" in err or "已存在" in err or "duplicate" in err.lower():
print("%s 8090 规则已存在" % sg_id)
added += 1
else:
print("%s: %s" % (sg_id, e))
if added > 0:
print(" 放行完成,可访问 http://42.194.245.239:8090")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
飞书妙记 · 命令行全自动下载视频(不打开浏览器)
通过 status API 获取视频下载链接,用 requests 直接下载。
依赖cookie_minutes.txt同 fetch_single_minute_by_cookie
逻辑参考bingsanyu/feishu_minutes feishu_downloader.py
用法:
python3 feishu_minutes_download_video.py "https://cunkebao.feishu.cn/minutes/obcnzs51k1j754643vx138sx" -o ~/Downloads/
python3 feishu_minutes_download_video.py obcnzs51k1j754643vx138sx --output /path/to/dir
"""
import os
import re
import sys
import time
from pathlib import Path
try:
import requests
except ImportError:
requests = None
SCRIPT_DIR = Path(__file__).resolve().parent
COOKIE_FILE = SCRIPT_DIR / "cookie_minutes.txt"
# status APImeetings 与 cunkebao 均需尝试)
STATUS_URLS = [
"https://meetings.feishu.cn/minutes/api/status",
"https://cunkebao.feishu.cn/minutes/api/status",
]
REFERERS = [
"https://meetings.feishu.cn/minutes/me",
"https://cunkebao.feishu.cn/minutes/",
]
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
def get_cookie() -> str:
"""与 fetch_single_minute_by_cookie 一致:环境变量 → cookie_minutes.txt"""
cookie = os.environ.get("FEISHU_MINUTES_COOKIE", "").strip()
if cookie and len(cookie) > 100 and "PASTE_YOUR" not in cookie:
return cookie
if COOKIE_FILE.exists():
raw = COOKIE_FILE.read_text(encoding="utf-8", errors="ignore").strip().splitlines()
for line in raw:
line = line.strip()
if line and not line.startswith("#") and "PASTE_YOUR" not in line:
return line
return ""
def get_csrf(cookie: str) -> str:
for name in ("bv_csrf_token=", "minutes_csrf_token="):
i = cookie.find(name)
if i != -1:
start = i + len(name)
end = cookie.find(";", start)
if end == -1:
end = len(cookie)
return cookie[start:end].strip()
return ""
def make_headers(cookie: str, referer: str) -> dict:
h = {
"User-Agent": USER_AGENT,
"Cookie": cookie,
"Referer": referer,
}
csrf = get_csrf(cookie)
if csrf:
h["bv-csrf-token"] = csrf
return h
def _to_simplified(text: str) -> str:
"""转为简体中文"""
try:
from opencc import OpenCC
return OpenCC("t2s").convert(text)
except ImportError:
trad_simp = {
"": "", "": "", "": "", "": "", "": "",
"": "", "": "", "": "", "": "", "": "",
}
for t, s in trad_simp.items():
text = text.replace(t, s)
return text
def get_video_url(cookie: str, object_token: str) -> tuple[str | None, str | None]:
"""
获取视频下载链接与标题。
返回 (video_download_url, title) 或 (None, None)
"""
for url_base, referer in zip(STATUS_URLS, REFERERS):
try:
url = f"{url_base}?object_token={object_token}&language=zh_cn&_t={int(time.time() * 1000)}"
headers = make_headers(cookie, referer)
r = requests.get(url, headers=headers, timeout=20)
if r.status_code != 200:
continue
data = r.json()
if data.get("code") != 0:
continue
inner = data.get("data") or {}
video_info = inner.get("video_info") or {}
video_url = video_info.get("video_download_url")
if not video_url or not isinstance(video_url, str):
continue
title = (inner.get("topic") or inner.get("title") or object_token)
if isinstance(title, str):
title = _to_simplified(title)
return (video_url.strip(), title)
except Exception:
continue
return (None, None)
def download_video(video_url: str, output_path: Path, headers: dict) -> bool:
"""流式下载视频到 output_path"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with requests.get(video_url, headers=headers, stream=True, timeout=60) as r:
r.raise_for_status()
total = int(r.headers.get("content-length", 0))
with open(output_path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
return output_path.exists() and output_path.stat().st_size > 1000
def main() -> int:
if not requests:
print("❌ 需要安装 requests: pip install requests", file=sys.stderr)
return 1
cookie = get_cookie()
if not cookie or "PASTE_YOUR" in cookie:
print("❌ 未配置有效 Cookie。请", file=sys.stderr)
print(" 1. 打开 https://cunkebao.feishu.cn/minutes/home 并登录", file=sys.stderr)
print(" 2. F12 → 网络 → 找到 list?size=20& 请求 → 复制请求头 Cookie", file=sys.stderr)
print(" 3. 粘贴到", str(COOKIE_FILE), "第一行", file=sys.stderr)
return 1
url_or_token = None
output_dir = Path.home() / "Downloads"
i = 1
while i < len(sys.argv):
if sys.argv[i] in ("-o", "--output") and i + 1 < len(sys.argv):
output_dir = Path(sys.argv[i + 1]).resolve()
i += 2
continue
if not sys.argv[i].startswith("-"):
url_or_token = sys.argv[i]
i += 1
if not url_or_token:
print("用法: python3 feishu_minutes_download_video.py <妙记URL或object_token> [-o 输出目录]", file=sys.stderr)
return 1
match = re.search(r"/minutes/([a-zA-Z0-9]+)", str(url_or_token))
object_token = match.group(1) if match else url_or_token
print(f"📥 获取视频链接 object_token={object_token}")
video_url, title = get_video_url(cookie, object_token)
if not video_url:
print("❌ 无法获取视频下载链接Cookie 可能失效或该妙记无视频)", file=sys.stderr)
return 1
safe_title = re.sub(r'[\\/*?:"<>|]', "_", (title or object_token))[:80]
output_path = output_dir / f"{safe_title}.mp4"
headers = make_headers(cookie, REFERERS[1])
headers["User-Agent"] = USER_AGENT
print(f"📹 下载中: {output_path.name}")
if download_video(video_url, output_path, headers):
size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"✅ 已保存: {output_path} ({size_mb:.1f} MB)")
print(str(output_path)) # 供调用方解析
return 0
print("❌ 下载失败", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -279,6 +279,24 @@ python3 scripts/wanzhi_feishu_project_sync.py
--- ---
## Wiki 子文档创建(日记分享 / 新研究)
在指定飞书 Wiki 节点下创建子文档,用于日记分享、新研究等内容沉淀。
**父节点**`https://cunkebao.feishu.cn/wiki/KNf7wA8Rki1NSdkkSIqcdFtTnWb`
```bash
# 使用默认内容:运营逻辑分析及目录结构
python3 scripts/feishu_wiki_create_doc.py
# 自定义标题和 JSON 内容
python3 scripts/feishu_wiki_create_doc.py --parent KNf7wA8Rki1NSdkkSIqcdFtTnWb --title "文档标题" --json blocks.json
```
JSON 格式:与 `团队入职流程与新人登记表_feishu_blocks.json` 相同,含 `children` 数组(飞书 docx blocks
---
## 文件结构 ## 文件结构
``` ```
@@ -297,6 +315,7 @@ python3 scripts/wanzhi_feishu_project_sync.py
├── feishu_video_clip.py # 视频智能切片 ├── feishu_video_clip.py # 视频智能切片
├── feishu_video_clip_README.md ├── feishu_video_clip_README.md
├── wanzhi_feishu_project_sync.py # 玩值电竞→飞书项目同步 ├── wanzhi_feishu_project_sync.py # 玩值电竞→飞书项目同步
├── feishu_wiki_create_doc.py # Wiki 子文档创建(日记/研究)
└── .feishu_tokens.json # Token 存储 └── .feishu_tokens.json # Token 存储
``` ```

View File

@@ -1,6 +1,6 @@
{ {
"access_token": "u-4KqUtcvFp8tH5h2BfuVTnWl5moqBk1ojpoaaJAM00wC7", "access_token": "u-5eUdcdU_R5QFvPA.Ea4PINl5kMg5k1iXNUaaZxA00Azi",
"refresh_token": "ur-68MMUChdl1LUn0KZf6dMALl5kOM5k1WNVEaaUMQ00Az6", "refresh_token": "ur-4oxZQevXNc39l0noeq8NcGl5mMiBk1irqUaaUNw00xzi",
"name": "飞书用户", "name": "飞书用户",
"auth_time": "2026-02-21T06:25:07.138984" "auth_time": "2026-02-22T10:22:11.152813"
} }

View File

@@ -5,25 +5,60 @@
============================ ============================
按「视频剪辑方案」图片中的 高峰时刻 + 想象的内容 整理切片, 按「视频剪辑方案」图片中的 高峰时刻 + 想象的内容 整理切片,
去语助词、去空格、关键词高亮、加速10% 去语助词、去空格、关键词高亮、加速10%文字/标题统一简体中文。
用法: 用法:
1. 先手动在飞书妙记页点击下载视频 一键全自动(命令行下载视频,不打开浏览器):
2. python3 feishu_image_slice.py --video "下载的视频.mp4"
或指定飞书链接(会打开链接,待你下载后监控 ~/Downloads
python3 feishu_image_slice.py --url "https://cunkebao.feishu.cn/minutes/xxx" python3 feishu_image_slice.py --url "https://cunkebao.feishu.cn/minutes/xxx"
或指定本地视频:
python3 feishu_image_slice.py --video "下载的视频.mp4"
需配置:智能纪要/脚本/cookie_minutes.txt飞书妙记 list 请求的 Cookie
""" """
import argparse import argparse
import json import json
import os import os
import re import re
import shutil
import subprocess import subprocess
import sys import sys
import time import time
from pathlib import Path from pathlib import Path
# 繁转简(动态加载)
def _to_simplified(text: str) -> str:
try:
from opencc import OpenCC
return OpenCC("t2s").convert(str(text))
except ImportError:
trad_simp = {"":"","":"","":"","":"","":"","":"","":"","":"","":""}
t = str(text)
for k, v in trad_simp.items():
t = t.replace(k, v)
return t
def ensure_deps():
"""动态检测并安装依赖FFmpeg、mlx_whisper、opencc 等)"""
missing = []
if shutil.which("ffmpeg") is None:
missing.append("ffmpeg (brew install ffmpeg)")
try:
import mlx_whisper # noqa
except ImportError:
missing.append("mlx-whisper (pip install mlx-whisper)")
try:
from opencc import OpenCC # noqa
except ImportError:
try:
subprocess.run([sys.executable, "-m", "pip", "install", "opencc-python-reimplemented", "-q"], capture_output=True, timeout=60)
except Exception:
missing.append("opencc (pip install opencc-python-reimplemented可选)")
if missing:
print("⚠ 可选依赖缺失(不影响基本流程):", ", ".join(missing))
# 图片方案高峰时刻7段约10分钟内 # 图片方案高峰时刻7段约10分钟内
# 来源:视频剪辑方案 | 智能剪辑 - 提取高峰时刻 # 来源:视频剪辑方案 | 智能剪辑 - 提取高峰时刻
PEAK_MOMENTS = [ PEAK_MOMENTS = [
@@ -105,27 +140,37 @@ def main():
parser.add_argument("--output", "-o", help="输出目录") parser.add_argument("--output", "-o", help="输出目录")
args = parser.parse_args() args = parser.parse_args()
ensure_deps()
video_path = None video_path = None
if args.video: if args.video:
video_path = Path(args.video).resolve() video_path = Path(args.video).resolve()
elif args.url: elif args.url:
print("📌 正在打开飞书链接,请在页面中点击【下载】按钮") print("📥 命令行全自动下载视频(不打开浏览器)...")
subprocess.run(["open", args.url], check=True) download_script = Path(__file__).resolve().parent.parent.parent / "智能纪要" / "脚本" / "feishu_minutes_download_video.py"
start = time.time() if not download_script.exists():
print("⏳ 监控 ~/Downloads检测到新视频后自动继续...") print("❌ 未找到 feishu_minutes_download_video.py")
for _ in range(120): return
time.sleep(3) out_dir = Path.home() / "Downloads"
v = find_recent_video(start) r = subprocess.run(
if v and v.stat().st_size > 1_000_000: [sys.executable, str(download_script), args.url, "-o", str(out_dir)],
video_path = v capture_output=True, text=True, timeout=300, cwd=str(download_script.parent)
print(f"✅ 检测到: {v.name}") )
break if r.returncode == 0 and r.stdout:
for line in r.stdout.strip().splitlines():
p = line.strip()
if p.endswith(".mp4") and Path(p).exists():
video_path = Path(p)
print(f"✅ 已下载: {video_path.name}")
break
if not video_path: if not video_path:
print("10分钟内未检测到新视频下载后使用 --video 指定路径") print("自动下载失败。请配置 智能纪要/脚本/cookie_minutes.txt 后重试,或手动下载后使用 --video")
if r.stderr:
print(r.stderr[:500])
return return
if not video_path or not video_path.exists(): if not video_path or not video_path.exists():
print("❌ 请提供 --video 路径,或使用 --url 并完成下载") print("❌ 请提供 --video 路径,或使用 --url(需配置 cookie_minutes.txt")
return return
duration = get_video_duration(video_path) duration = get_video_duration(video_path)
@@ -140,6 +185,12 @@ def main():
h["end_time"] = h.get("end_time") or h["end"] h["end_time"] = h.get("end_time") or h["end"]
highlights = [h for h in highlights if parse_time(h["end_time"]) <= duration + 5] highlights = [h for h in highlights if parse_time(h["end_time"]) <= duration + 5]
# 标题统一简体中文
for h in highlights:
for k in ("title", "hook_3sec", "cta_ending"):
if k in h and h[k]:
h[k] = _to_simplified(str(h[k]))
print(f" 切片: {len(highlights)} 段(高峰时刻方案)") print(f" 切片: {len(highlights)} 段(高峰时刻方案)")
# 输出目录 # 输出目录

View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
在指定飞书 Wiki 节点下创建子文档并写入内容。
用于:日记分享、新研究等内容沉淀到飞书知识库。
用法:
python3 feishu_wiki_create_doc.py
python3 feishu_wiki_create_doc.py --parent KNf7wA8Rki1NSdkkSIqcdFtTnWb --title "文档标题" --json blocks.json
"""
import os
import sys
import json
import argparse
import requests
from datetime import datetime
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG = {
'APP_ID': 'cli_a48818290ef8100d',
'APP_SECRET': 'dhjU0qWd5AzicGWTf4cTqhCWJOrnuCk4',
'TOKEN_FILE': os.path.join(SCRIPT_DIR, '.feishu_tokens.json'),
}
def load_tokens():
if os.path.exists(CONFIG['TOKEN_FILE']):
with open(CONFIG['TOKEN_FILE'], encoding='utf-8') as f:
return json.load(f)
return {}
def save_tokens(tokens):
with open(CONFIG['TOKEN_FILE'], 'w', encoding='utf-8') as f:
json.dump(tokens, f, ensure_ascii=False, indent=2)
def get_app_token():
r = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal/",
json={"app_id": CONFIG['APP_ID'], "app_secret": CONFIG['APP_SECRET']},
timeout=10)
data = r.json()
return data.get('app_access_token') if data.get('code') == 0 else None
def refresh_token_silent(tokens):
if not tokens.get('refresh_token'):
return None
app_token = get_app_token()
if not app_token:
return None
r = requests.post(
"https://open.feishu.cn/open-apis/authen/v1/oidc/refresh_access_token",
headers={"Authorization": f"Bearer {app_token}", "Content-Type": "application/json"},
json={"grant_type": "refresh_token", "refresh_token": tokens['refresh_token']},
timeout=10)
result = r.json()
if result.get('code') == 0:
data = result.get('data', {})
tokens['access_token'] = data.get('access_token')
tokens['refresh_token'] = data.get('refresh_token', tokens['refresh_token'])
tokens['auth_time'] = datetime.now().isoformat()
save_tokens(tokens)
return tokens['access_token']
return None
def check_token_valid(token, parent_token):
if not token:
return False
try:
r = requests.get(
f"https://open.feishu.cn/open-apis/wiki/v2/spaces/get_node?token={parent_token}",
headers={'Authorization': f'Bearer {token}'}, timeout=10)
return r.json().get('code') == 0
except Exception:
return False
def get_token(parent_token):
tokens = load_tokens()
if tokens.get('access_token') and check_token_valid(tokens['access_token'], parent_token):
return tokens['access_token']
print("🔄 静默刷新 Token...")
new_token = refresh_token_silent(tokens)
if new_token and check_token_valid(new_token, parent_token):
print("✅ Token 刷新成功")
return new_token
print("❌ 无法获取有效 Token请先运行 auto_log.py 完成飞书授权")
return None
def create_wiki_doc(parent_token: str, title: str, blocks: list) -> tuple[bool, str]:
"""
在指定 wiki 父节点下创建子文档并写入 blocks。
返回 (成功, url或错误信息)
"""
token = get_token(parent_token)
if not token:
return False, "Token 无效"
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
# 1. 获取父节点信息(含 space_id
r = requests.get(
f"https://open.feishu.cn/open-apis/wiki/v2/spaces/get_node?token={parent_token}",
headers=headers, timeout=30)
if r.json().get('code') != 0:
return False, r.json().get('msg', 'get_node 失败')
node = r.json()['data']['node']
space_id = node.get('space_id') or (node.get('space') or {}).get('space_id') or node.get('origin_space_id')
if not space_id:
return False, "无法获取 space_id"
# 2. 创建子节点
create_r = requests.post(
f"https://open.feishu.cn/open-apis/wiki/v2/spaces/{space_id}/nodes",
headers=headers,
json={
"parent_node_token": parent_token,
"obj_type": "docx",
"node_type": "origin",
"title": title,
},
timeout=30)
create_data = create_r.json()
if create_r.status_code != 200 or create_data.get('code') != 0:
return False, create_data.get('msg', str(create_data))
new_node = create_data.get('data', {}).get('node', {})
node_token = new_node.get('node_token')
doc_token = new_node.get('obj_token') or node_token
if not doc_token:
nr = requests.get(
f"https://open.feishu.cn/open-apis/wiki/v2/spaces/get_node?token={node_token}",
headers=headers, timeout=30)
if nr.json().get('code') == 0:
doc_token = nr.json()['data']['node'].get('obj_token') or node_token
# 3. 分批写入 blocks每批最多 50
for i in range(0, len(blocks), 50):
batch = blocks[i:i + 50]
wr = requests.post(
f"https://open.feishu.cn/open-apis/docx/v1/documents/{doc_token}/blocks/{doc_token}/children",
headers=headers,
json={'children': batch, 'index': i},
timeout=30)
if wr.json().get('code') != 0:
return False, wr.json().get('msg', '写入 blocks 失败')
if len(blocks) > 50:
import time
time.sleep(0.3)
url = f"https://cunkebao.feishu.cn/wiki/{node_token}" if node_token else "https://cunkebao.feishu.cn/wiki"
return True, url
def main():
ap = argparse.ArgumentParser(description='在飞书 Wiki 下创建子文档')
ap.add_argument('--parent', default='KNf7wA8Rki1NSdkkSIqcdFtTnWb', help='父节点 token')
ap.add_argument('--title', default='运营逻辑分析及目录结构', help='文档标题')
ap.add_argument('--json', default=None, help='blocks JSON 文件路径(含 children 数组)')
args = ap.parse_args()
# 默认使用内置的运营逻辑文档
if args.json:
with open(args.json, 'r', encoding='utf-8') as f:
data = json.load(f)
blocks = data.get('children', data) if isinstance(data, dict) else data
else:
blocks = get_default_blocks()
print("=" * 50)
print(f"📤 在飞书 Wiki 下创建文档:{args.title}")
print(f" 父节点: {args.parent}")
print("=" * 50)
ok, result = create_wiki_doc(args.parent, args.title, blocks)
if ok:
print(f"✅ 创建成功")
print(f"📎 {result}")
else:
print(f"❌ 失败: {result}")
sys.exit(1)
print("=" * 50)
def get_default_blocks():
"""第一篇:运营逻辑分析及目录结构的默认 blocks"""
return [
{"block_type": 3, "heading1": {"elements": [{"text_run": {"content": "运营逻辑分析及目录结构", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "本文档分析本知识空间的运营逻辑,并整理建议的目录结构,供后续日记分享、新研究等内容沉淀使用。", "text_element_style": {}}}], "style": {}}},
{"block_type": 4, "heading2": {"elements": [{"text_run": {"content": "一、空间定位", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "本空间用于:", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 日记分享:日常思考、实践复盘、阶段性总结", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 新研究:技术调研、方法论探索、行业/产品分析", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 知识沉淀:可复用的经验、模板、工作流", "text_element_style": {}}}], "style": {}}},
{"block_type": 4, "heading2": {"elements": [{"text_run": {"content": "二、运营逻辑(闭环)", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "输入 → 整理 → 沉淀 → 复用", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "1. 输入:日常产出(对话、会议、实践)、研究发现、灵感碎片", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "2. 整理:按主题/时间归类,提炼要点,形成可读结构", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "3. 沉淀:写入本空间对应目录,便于检索与关联", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "4. 复用:后续查阅、迭代更新、形成模板或 SOP", "text_element_style": {}}}], "style": {}}},
{"block_type": 4, "heading2": {"elements": [{"text_run": {"content": "三、建议目录结构", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "本空间/\n├── 日记分享/\n│ ├── 按周或按主题归档\n│ └── 可含:今日思考、复盘、阶段性总结\n├── 新研究/\n│ ├── 技术调研\n│ ├── 方法论探索\n│ └── 行业/产品分析\n├── 知识沉淀/\n│ ├── 可复用经验\n│ ├── 模板与工作流\n│ └── SOP 与规范\n└── 运营逻辑分析及目录结构(本文档)", "text_element_style": {}}}], "style": {}}},
{"block_type": 4, "heading2": {"elements": [{"text_run": {"content": "四、使用建议", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 日记类:建议按周或按主题建子页,便于回顾与检索", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 研究类:单篇独立,标题含关键词便于搜索", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "• 沉淀类可链接到卡若AI 经验库、参考资料,形成双向引用", "text_element_style": {}}}], "style": {}}},
{"block_type": 2, "text": {"elements": [{"text_run": {"content": "— 文档由卡若AI 水桥生成 | 2026-02-22", "text_element_style": {}}}], "style": {}}},
]
if __name__ == "__main__":
main()

View File

@@ -20,14 +20,14 @@ python3 /Users/karuo/Documents/个人/卡若AI/02_卡人/飞书管理/s
### 按剪辑方案图片切片(高峰时刻+想象的内容) ### 按剪辑方案图片切片(高峰时刻+想象的内容)
按「视频剪辑方案」图片整理7 段高峰时刻 + 加速 10% + 去语助词 + 关键词高亮。 按「视频剪辑方案」图片整理7 段高峰时刻 + 加速 10% + 去语助词 + 关键词高亮。**文字/标题统一简体中文**。
```bash ```bash
# 1. 打开飞书链接,点击下载视频 # 一键全自动(命令行下载视频,不打开浏览器)
# 2. 下载完成后运行(或直接用 --video 指定已有视频 # 需先配置:智能纪要/脚本/cookie_minutes.txt飞书妙记 list 请求的 Cookie
python3 /Users/karuo/Documents/个人/卡若AI/02_卡人/水桥_平台对接/飞书管理/脚本/feishu_image_slice.py --url "https://cunkebao.feishu.cn/minutes/obcnzs51k1j754643vx138sx" python3 /Users/karuo/Documents/个人/卡若AI/02_卡人/水桥_平台对接/飞书管理/脚本/feishu_image_slice.py --url "https://cunkebao.feishu.cn/minutes/obcnzs51k1j754643vx138sx"
# 若已下载,直接指定视频路径 # 若已下载视频,直接指定路径
python3 脚本/feishu_image_slice.py --video "~/Downloads/xxx.mp4" python3 脚本/feishu_image_slice.py --video "~/Downloads/xxx.mp4"
``` ```
@@ -44,13 +44,13 @@ python3 feishu_one_click.py "https://cunkebao.feishu.cn/minutes/obcnjnsx2mz7vj5q
| 步骤 | 操作 | 自动化程度 | | 步骤 | 操作 | 自动化程度 |
|:---|:---|:---| |:---|:---|:---|
| 1. 获取妙记信息 | API自动获取 | ✅ 全自动 | | 1. 下载视频 | Cookie + status API 命令行下载 | ✅ 全自动(不打开浏览器) |
| 2. 下载视频 | 飞书客户端打开 | 🔸 需点击下载 | | 2. 获取妙记信息 | 按高峰时刻方案 | ✅ 全自动 |
| 3. AI生成切片方案 | Gemini API | ✅ 全自动 | | 3. 批量切片 | FFmpeg | ✅ 全自动 |
| 4. 批量切片 | FFmpeg | ✅ 全自动 | | 4. 增强(封面+字幕+加速10% | soul_enhance | ✅ 全自动 |
| 5. 发送到群 | Webhook | ✅ 全自动 | | 5. 发送到群 | Webhook | ✅ 全自动 |
> **注意**步骤2需要在飞书中点击下载按钮。飞书客户端已自动登录**无需扫码** > **注意**需配置 `智能纪要/脚本/cookie_minutes.txt`(飞书妙记 list 请求的 Cookie即可全自动下载
--- ---

View File

@@ -42,8 +42,18 @@ def format_timestamp(seconds: float) -> str:
return f"{hours:02d}:{minutes:02d}:{secs:02d}" return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def _to_simplified(text: str) -> str:
"""转为简体中文(用于文件名/标题)"""
try:
from opencc import OpenCC
return OpenCC("t2s").convert(str(text))
except ImportError:
return str(text)
def sanitize_filename(name: str, max_length: int = 50) -> str: def sanitize_filename(name: str, max_length: int = 50) -> str:
"""清理文件名,移除非法字符""" """清理文件名,移除非法字符,标题统一简体"""
name = _to_simplified(str(name))
# 保留字母、数字、中文、空格、下划线、连字符 # 保留字母、数字、中文、空格、下划线、连字符
safe_chars = [] safe_chars = []
for c in name: for c in name:

View File

@@ -669,21 +669,20 @@ def main():
generate_index(highlights, output_dir) generate_index(highlights, output_dir)
def generate_index(highlights, output_dir): def generate_index(highlights, output_dir):
"""生成目录索引""" """生成目录索引(标题/Hook/CTA 统一简体中文)"""
index_path = output_dir.parent / "目录索引_enhanced.md" index_path = output_dir.parent / "目录索引_enhanced.md"
with open(index_path, 'w', encoding='utf-8') as f: with open(index_path, 'w', encoding='utf-8') as f:
f.write("# Soul派对81场 - 增强版切片目录\n\n") f.write("# Soul派对 - 增强版切片目录\n\n")
f.write(f"**日期**: 2026-01-23\n")
f.write(f"**优化**: 封面+字幕+加速10%+去语气词\n\n") f.write(f"**优化**: 封面+字幕+加速10%+去语气词\n\n")
f.write("## 切片列表\n\n") f.write("## 切片列表\n\n")
f.write("| 序号 | 标题 | Hook | CTA |\n") f.write("| 序号 | 标题 | Hook | CTA |\n")
f.write("|------|------|------|-----|\n") f.write("|------|------|------|-----|\n")
for i, clip in enumerate(highlights, 1): for i, clip in enumerate(highlights, 1):
title = clip.get("title", f"clip_{i}") title = _to_simplified(clip.get("title", f"clip_{i}"))
hook = clip.get("hook_3sec", "") hook = _to_simplified(clip.get("hook_3sec", ""))
cta = clip.get("cta_ending", "") cta = _to_simplified(clip.get("cta_ending", ""))
f.write(f"| {i} | {title} | {hook} | {cta} |\n") f.write(f"| {i} | {title} | {hook} | {cta} |\n")
print(f"\n📋 目录索引: {index_path}") print(f"\n📋 目录索引: {index_path}")

View File

@@ -66,3 +66,4 @@
| 2026-02-22 09:55:19 | 🔄 卡若AI 同步 2026-02-22 09:55 | 更新:金仓、水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | | 2026-02-22 09:55:19 | 🔄 卡若AI 同步 2026-02-22 09:55 | 更新:金仓、水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 09:56:29 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:总索引与入口、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | | 2026-02-22 09:56:29 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:总索引与入口、卡木、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 09:56:57 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 8 个 | | 2026-02-22 09:56:57 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 10:01:54 | 🔄 卡若AI 同步 2026-02-22 10:01 | 更新:运营中枢参考资料、运营中枢工作台 | 排除 >20MB: 8 个 |

View File

@@ -69,3 +69,4 @@
| 2026-02-22 09:55:19 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:55 | 更新:金仓、水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) | | 2026-02-22 09:55:19 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:55 | 更新:金仓、水桥平台对接、水溪整理归档、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 09:56:29 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:总索引与入口、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) | | 2026-02-22 09:56:29 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:总索引与入口、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 09:56:57 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) | | 2026-02-22 09:56:57 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 09:56 | 更新:卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 10:01:54 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 10:01 | 更新:运营中枢参考资料、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |