🔄 卡若AI 同步 2026-02-22 11:58 | 更新:金仓、水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个

This commit is contained in:
2026-02-22 11:58:14 +08:00
parent 8b434da135
commit 4e13af2f79
7 changed files with 221 additions and 266 deletions

View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
腾讯云 TATkr宝塔 全量修复
1. Nginx确保使用宝塔 Nginx若系统 Nginx 在运行则 kill 后启动宝塔 Nginx
2. Node 项目:全部在宝塔 Node 下启动run=False 的逐一 stop→清端口→start
凭证00_账号与API索引.md
"""
import base64
import os
import re
import sys
import time
KR_INSTANCE_ID = "ins-aw0tnqjo"
REGION = "ap-guangzhou"
SHELL_SCRIPT = r'''#!/bin/bash
set -e
echo "=== kr宝塔 全量修复Nginx(宝塔) + 全部 Node 项目 ==="
# 1. Nginx确认使用宝塔 Nginx非系统 Nginx
echo ""
echo "【1】Nginx 检查与修复"
NGX=$(ps aux | grep -E "nginx|nginx:" | grep -v grep | head -1 || true)
if echo "$NGX" | grep -q "/usr/sbin/nginx"; then
echo " 检测到系统 Nginx切换为宝塔 Nginx..."
killall nginx 2>/dev/null || true
sleep 2
fi
# 若无 nginx 或需确保宝塔 nginx
if ! pgrep -f "/www/server/nginx" >/dev/null 2>&1; then
/www/server/nginx/sbin/nginx -c /www/server/nginx/conf/nginx.conf 2>/dev/null && echo " 宝塔 Nginx 已启动" || echo " Nginx 可能已在运行"
fi
nginx -t 2>/dev/null && nginx -s reload 2>/dev/null && echo " Nginx 重载完成"
echo " 当前 Nginx: $(ps aux | grep nginx | grep -v grep | head -1 | awk '{print $11}')"
# 2. 全部 Node 项目批量启动(宝塔 API
echo ""
echo "【2】Node 项目批量启动(宝塔 API"
python3 - << 'PYEOF'
import hashlib, json, os, re, subprocess, time, urllib.request, urllib.parse, ssl
ssl._create_default_https_context = ssl._create_unverified_context
PANEL, K = "https://127.0.0.1:9988", "qcWubCdlfFjS2b2DMT1lzPFaDfmv1cBT"
def sign():
t = int(time.time())
s = str(t) + hashlib.md5(K.encode()).hexdigest()
return {"request_time": t, "request_token": hashlib.md5(s.encode()).hexdigest()}
def post(p, d=None):
pl = sign()
if d: pl.update(d)
r = urllib.request.Request(PANEL + p, data=urllib.parse.urlencode(pl).encode())
with urllib.request.urlopen(r, timeout=30) as resp:
return json.loads(resp.read().decode())
def pids(port):
try:
o = subprocess.check_output("ss -tlnp 2>/dev/null | grep ':%s ' || true" % port, shell=True, universal_newlines=True)
return sorted({int(x) for x in re.findall(r"pid=(\d+)", o)})
except: return []
def ports(it):
cfg = it.get("project_config") or {}
if isinstance(cfg, str):
try: cfg = json.loads(cfg)
except: cfg = {}
ps = []
if cfg.get("port"): ps.append(int(cfg["port"]))
for m in re.findall(r"-p\s*(\d+)", str(cfg.get("project_script",""))): ps.append(int(m))
return sorted(set(ps))
items = post("/project/nodejs/get_project_list").get("data") or post("/project/nodejs/get_project_list").get("list") or []
to_start = [it for it in items if it.get("name") and it.get("run") is not True]
print(" 未运行项目数: %d / %d" % (len(to_start), len(items)))
for it in to_start:
name = it.get("name") or it.get("project_name")
if not name: continue
try:
for port in ports(it):
for pid in pids(port):
try: subprocess.call("kill -9 %s 2>/dev/null" % pid, shell=True)
except: pass
pf = "/www/server/nodejs/vhost/pids/%s.pid" % name
if os.path.exists(pf):
try: open(pf,"w").write("0")
except: pass
post("/project/nodejs/stop_project", {"project_name": name})
time.sleep(0.5)
r = post("/project/nodejs/start_project", {"project_name": name})
ok = r.get("status") is True or "成功" in str(r.get("msg",""))
print(" %s: %s" % (name, "OK" if ok else "FAIL"))
except Exception as e:
print(" %s: ERR %s" % (name, str(e)[:60]))
time.sleep(1)
time.sleep(5)
items2 = post("/project/nodejs/get_project_list").get("data") or []
run_c = sum(1 for x in items2 if x.get("run"))
print(" 结果: 运行 %d / 共 %d" % (run_c, len(items2)))
PYEOF
echo ""
echo "=== 完成 ==="
'''
def _read_creds():
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(6):
root = d
if os.path.isfile(os.path.join(root, "运营中枢", "工作台", "00_账号与API索引.md")):
path = os.path.join(root, "运营中枢", "工作台", "00_账号与API索引.md")
with open(path, "r", encoding="utf-8") as f:
text = f.read()
sid = skey = None
in_tx = False
for line in text.splitlines():
if "### 腾讯云" in line:
in_tx = True
continue
if in_tx and line.strip().startswith("###"):
break
if not in_tx:
continue
m = re.search(r"\|\s*[^|]*(?:SecretId|密钥)[^|]*\|\s*`([^`]+)`", line, re.I)
if m and m.group(1).strip().startswith("AKID"):
sid = m.group(1).strip()
m = re.search(r"\|\s*SecretKey\s*\|\s*`([^`]+)`", line, re.I)
if m:
skey = m.group(1).strip()
return sid or None, skey or None
d = os.path.dirname(d)
return None, None
def main():
sid = os.environ.get("TENCENTCLOUD_SECRET_ID")
skey = os.environ.get("TENCENTCLOUD_SECRET_KEY")
if not sid or not skey:
sid, skey = _read_creds()
if not sid or not skey:
print("❌ 未配置腾讯云 SecretId/SecretKey")
return 1
try:
from tencentcloud.common import credential
from tencentcloud.tat.v20201028 import tat_client, models
except ImportError:
print("pip install tencentcloud-sdk-python-tat")
return 1
cred = credential.Credential(sid, skey)
client = tat_client.TatClient(cred, REGION)
req = models.RunCommandRequest()
req.Content = base64.b64encode(SHELL_SCRIPT.encode()).decode()
req.InstanceIds = [KR_INSTANCE_ID]
req.CommandType = "SHELL"
req.Timeout = 180
req.CommandName = "kr宝塔_全量修复"
resp = client.RunCommand(req)
print("✅ TAT 已下发 InvocationId:", resp.InvocationId)
print(" 步骤: Nginx 强制宝塔 → 全部 Node 项目启动")
print(" 等待 90s...")
time.sleep(90)
try:
req2 = models.DescribeInvocationTasksRequest()
f = models.Filter()
f.Name = "invocation-id"
f.Values = [resp.InvocationId]
req2.Filters = [f]
r2 = client.DescribeInvocationTasks(req2)
for t in (r2.InvocationTaskSet or []):
print(" 状态:", getattr(t, "TaskStatus", ""))
out = getattr(t, "Output", None) or ""
if out:
print(" 输出:\n", out[:4000])
except Exception as e:
print(" 查询:", e)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -55,3 +55,13 @@ python3 "02_卡人/水桥_平台对接/飞书管理/脚本/feishu_wiki_
- 图片块block_type 18 gallery若飞书 API 报 `invalid param`,会退化为文本说明,图片仍上传至文档素材,用户可手动「插入 → 图片 → 文档素材」插入
- `image_paths` 建议用相对路径,便于 JSON 迁移
## 图片块调试
若需尝试 file 块block_type 12代替 gallery 插入图片:
```bash
FEISHU_IMG_BLOCK=file python3 "02_卡人/水桥_平台对接/飞书管理/脚本/feishu_wiki_gene_capsule_article.py"
```
脚本会打印 API 错误详情code、msg、debug便于排查。

View File

@@ -50,7 +50,8 @@ def upload_image_to_doc(token: str, doc_token: str, img_path: Path) -> str | Non
def _make_image_block(file_token: str) -> dict:
"""生成飞书图片块,尝试 gallery 与 file 两种格式"""
"""生成飞书图片块。优先 gallery(18),备选 file(12) 行内展示"""
# 格式参考飞书文档Gallery imageList 每项需 fileToken
return {
"block_type": 18,
"gallery": {
@@ -60,6 +61,14 @@ def _make_image_block(file_token: str) -> dict:
}
def _make_file_block(file_token: str, filename: str = "image.png") -> dict:
"""备选file 块行内展示图片viewType=inline"""
return {
"block_type": 12,
"file": {"fileToken": file_token, "viewType": "inline", "fileName": filename},
}
def _title_matches(node_title: str, target: str) -> bool:
"""判断节点标题是否与目标相似(含关键词即视为匹配)"""
if not node_title or not target:
@@ -207,10 +216,11 @@ def create_doc_with_images():
for b in raw_blocks:
c = (b.get("text") or {}).get("elements") or []
content = (c[0].get("text_run") or {}).get("content", "") if c else ""
use_file_block = os.environ.get("FEISHU_IMG_BLOCK") == "file"
if "【配图 1" in content and tokens[0]:
blocks.append(_make_image_block(tokens[0]))
blocks.append(_make_file_block(tokens[0], "基因胶囊_概念与流程.png") if use_file_block else _make_image_block(tokens[0]))
elif "【配图 2" in content and len(tokens) > 1 and tokens[1]:
blocks.append(_make_image_block(tokens[1]))
blocks.append(_make_file_block(tokens[1], "基因胶囊_完整工作流程图.png") if use_file_block else _make_image_block(tokens[1]))
elif "【配图 1" in content or "【配图 2" in content:
blocks.append(b)
else:
@@ -229,9 +239,11 @@ def create_doc_with_images():
timeout=30)
res = wr.json()
if res.get("code") != 0:
if any(b.get("block_type") in (12, 13, 18) for b in batch):
print(f"⚠️ API 错误: code={res.get('code')} msg={res.get('msg')} debug={res.get('debug', '')}")
# 若含图片的批次失败,则跳过图片仅写文本
if any(b.get("block_type") in (13, 18) for b in batch):
safe = [b for b in batch if b.get("block_type") not in (13, 18)]
if any(b.get("block_type") in (12, 13, 18) for b in batch):
safe = [b for b in batch if b.get("block_type") not in (12, 13, 18)]
if safe:
wr2 = requests.post(
f"https://open.feishu.cn/open-apis/docx/v1/documents/{doc_token}/blocks/{doc_token}/children",

View File

@@ -1,234 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
按完整主题切片 - 分析 transcript找出每个主题的完整起止时间
与 identify_highlights 不同:本脚本按「视频剪辑方案」的 7 个主题类型分析,
时间节点非固定,需结合视频内容分析出每个主题的完整段落。
主题类型(来自剪辑方案图片):
1. 引出问题 - 建立共鸣,问用户痛点
2. 解决方案 - 核心方法、干货
3. 案例分享 - 真实案例、数据
4. 未来展望 - 接下来怎么做
5. 痛点强调 - 避坑、踩坑警告
6. 福利展示 - 限时福利、福利放送
7. 权威背书 - 专业背书、可信证明
用法:
python3 identify_theme_segments.py -t transcript.srt -o highlights.json
"""
import argparse
import json
import re
import sys
from pathlib import Path
OLLAMA_URL = "http://localhost:11434"
DEFAULT_CTA = "关注我,每天学一招私域干货"
THEME_DEFINITIONS = """
【主题类型定义,按视频剪辑方案】
1. 引出问题:开场建立共鸣,提出用户普遍遇到的问题或痛点
2. 解决方案:讲解核心方法、干货、具体做法
3. 案例分享:真实案例、数据佐证、用户证言
4. 未来展望:接下来这样做、未来趋势、行动建议
5. 痛点强调:这个坑千万别踩、常见误区、避坑指南
6. 福利展示:限时福利、福利放送、赠送、优惠
7. 权威背书:专业背书、可信证明、资质、成果展示
参考时间顺序(非固定):引出问题→解决方案→案例分享→未来展望→痛点强调→福利展示→权威背书
"""
def parse_srt_segments(srt_path: str) -> list:
"""解析 SRT 为 [{start_sec, end_sec, start_time, end_time, text}, ...]"""
with open(srt_path, "r", encoding="utf-8") as f:
content = f.read()
segments = []
pattern = r"(\d+)\n(\d{2}):(\d{2}):(\d{2}),(\d{3}) --> (\d{2}):(\d{2}):(\d{2}),(\d{3})\n(.*?)(?=\n\n|\Z)"
for m in re.findall(pattern, content, re.DOTALL):
sh, sm, ss = int(m[1]), int(m[2]), int(m[3])
eh, em, es = int(m[5]), int(m[6]), int(m[7])
start_sec = sh * 3600 + sm * 60 + ss
end_sec = eh * 3600 + em * 60 + es
text = m[9].strip().replace("\n", " ")
if len(text) > 2:
segments.append({
"start_sec": start_sec, "end_sec": end_sec,
"start_time": f"{sh:02d}:{sm:02d}:{ss:02d}",
"end_time": f"{eh:02d}:{em:02d}:{es:02d}",
"text": text,
})
return segments
def srt_to_timestamped_text(srt_path: str) -> str:
"""将 SRT 转为带时间戳的纯文本"""
segments = parse_srt_segments(srt_path)
return "\n".join(f"[{s['start_time']}] {s['text']}" for s in segments)
def _build_theme_prompt(transcript: str) -> str:
txt = transcript[:15000] if len(transcript) > 15000 else transcript
return f"""你是短视频内容策划师。根据「视频剪辑方案」,分析以下视频文字稿,找出 7 类主题各自的**完整段落**。
{THEME_DEFINITIONS}
【关键】时间节点非固定!需结合视频实际内容分析:
- 每个主题只取一段,且必须是**完整主题**(不中断、语义完整)
- 从文字稿中精确找出该主题开始和结束的时间点
- 若某类主题在视频中未出现,可跳过,不强制凑齐 7 段
- 参考顺序帮助理解,实际顺序按内容出现顺序
【输出格式】严格 JSON 数组,每项含:
- theme: 主题类型名(如"引出问题"
- title: 简短标题(简体中文)
- start_time: "HH:MM:SS"
- end_time: "HH:MM:SS"
- hook_3sec: 前3秒Hook15字内
- cta_ending: 结尾CTA可用"{DEFAULT_CTA}"
- transcript_excerpt: 该段内容前60字
只输出 JSON 数组,不要```包裹,不要其他文字。所有文字必须简体中文。
视频文字稿:
---
{txt}
---"""
def _parse_ai_json(text: str) -> list:
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```\s*$", "", text)
m = re.search(r"\[[\s\S]*\]", text)
if m:
return json.loads(m.group())
return json.loads(text)
def call_ollama(transcript: str) -> str:
"""调用 Ollama 分析主题"""
import requests
prompt = _build_theme_prompt(transcript)
try:
r = requests.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "qwen2.5:1.5b",
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.2, "num_predict": 8192},
},
timeout=120,
)
if r.status_code != 200:
raise RuntimeError(f"Ollama {r.status_code}")
return r.json().get("response", "").strip()
except Exception as e:
raise RuntimeError(f"Ollama 调用失败: {e}") from e
def fallback_by_keywords(transcript_path: str) -> list:
"""规则备用:按关键词粗分主题段,每段限制 45-120 秒"""
segments = parse_srt_segments(transcript_path)
if not segments:
return []
total_duration = segments[-1]["end_sec"] if segments else 0
theme_keywords = {
"引出问题": ["问题", "遇到", "痛点", "为什么", "困惑", "难题"],
"解决方案": ["方法", "解决", "怎么做", "技巧", "核心", "干货"],
"案例分享": ["案例", "例子", "数据", "客户", "赚了", "做了"],
"未来展望": ["接下来", "未来", "行动", "去做", "试试"],
"痛点强调": ["", "避坑", "千万别", "误区", "踩雷"],
"福利展示": ["福利", "限时", "赠送", "优惠", "免费"],
"权威背书": ["专业", "背书", "资质", "成果", "证明"],
}
MIN_SEG = 45
MAX_SEG = 120
result = []
used_until = 0 # 已使用到的时间点,避免重叠
for theme, kws in theme_keywords.items():
cands = [s for s in segments if s["start_sec"] >= used_until and any(kw in s["text"] for kw in kws)]
if not cands:
continue
first = cands[0]
start_sec = first["start_sec"]
# 合并相邻字幕,但限制在 MAX_SEG 秒内
end_sec = first["end_sec"]
for s in segments:
if s["start_sec"] < start_sec:
continue
if s["start_sec"] > start_sec + MAX_SEG:
break
if s["end_sec"] <= end_sec + 15: # 连续/接近
end_sec = max(end_sec, s["end_sec"])
elif s["start_sec"] <= end_sec + 5: # 间隙小于5秒
end_sec = min(s["end_sec"], start_sec + MAX_SEG)
end_sec = min(end_sec, start_sec + MAX_SEG)
if end_sec - start_sec < MIN_SEG:
end_sec = min(start_sec + MIN_SEG, total_duration)
used_until = end_sec + 10 # 下一段至少间隔10秒
h, m, s_ = int(start_sec // 3600), int((start_sec % 3600) // 60), int(start_sec % 60)
eh, em, es = int(end_sec // 3600), int((end_sec % 3600) // 60), int(end_sec % 60)
result.append({
"theme": theme,
"title": theme,
"start_time": f"{h:02d}:{m:02d}:{s_:02d}",
"end_time": f"{eh:02d}:{em:02d}:{es:02d}",
"hook_3sec": f"精彩{theme}",
"cta_ending": DEFAULT_CTA,
"transcript_excerpt": first["text"][:60],
})
return result
def main():
parser = argparse.ArgumentParser(description="按完整主题分析 transcript")
parser.add_argument("--transcript", "-t", required=True, help="transcript.srt")
parser.add_argument("--output", "-o", required=True, help="highlights.json")
args = parser.parse_args()
transcript_path = Path(args.transcript)
if not transcript_path.exists():
print(f"❌ 不存在: {transcript_path}", file=sys.stderr)
sys.exit(1)
text = srt_to_timestamped_text(str(transcript_path))
if len(text) < 100:
print("❌ 文字稿过短", file=sys.stderr)
sys.exit(1)
data = None
try:
print("正在分析完整主题Ollama...")
raw = call_ollama(text)
data = _parse_ai_json(raw)
if data and isinstance(data, list):
# 校验时间格式
for i, h in enumerate(data):
if isinstance(h, dict):
if "start" in h and "start_time" not in h:
h["start_time"] = h.pop("start", "")
if "end" in h and "end_time" not in h:
h["end_time"] = h.pop("end", "")
h.setdefault("title", h.get("theme", f"主题{i+1}"))
h.setdefault("hook_3sec", h.get("title", "")[:15])
h.setdefault("cta_ending", DEFAULT_CTA)
data = [h for h in data if isinstance(h, dict) and h.get("start_time") and h.get("end_time")]
except Exception as e:
print(f"Ollama 失败 ({e}),使用规则备用", file=sys.stderr)
if not data or not isinstance(data, list):
print("使用规则备用(按关键词)", file=sys.stderr)
data = fallback_by_keywords(str(transcript_path))
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ 已输出 {len(data)} 个完整主题: {out_path}")
if __name__ == "__main__":
main()

View File

@@ -89,10 +89,8 @@ def main():
parser.add_argument("--video", "-v", required=True, help="输入视频路径")
parser.add_argument("--output", "-o", help="输出目录(默认:视频同目录下 视频名_output")
parser.add_argument("--clips", "-n", type=int, default=8, help="切片数量")
parser.add_argument("--mode", "-m", choices=["highlights", "theme"], default="highlights",
help="highlights=高光识别(默认); theme=按完整主题分析(时间节点非固定)")
parser.add_argument("--skip-transcribe", action="store_true", help="跳过转录(已有 transcript.srt")
parser.add_argument("--skip-highlights", action="store_true", help="跳过高光/主题识别(已有 highlights.json")
parser.add_argument("--skip-highlights", action="store_true", help="跳过高光识别(已有 highlights.json")
parser.add_argument("--skip-clips", action="store_true", help="跳过切片(已有 clips/,仅重新增强)")
args = parser.parse_args()
@@ -156,31 +154,19 @@ def main():
transcript_to_simplified(transcript_path)
print(" ✓ 字幕已转简体")
# 2. 高光/主题识别
# 2. 高光识别
if not args.skip_highlights:
if args.mode == "theme":
run(
[
sys.executable,
str(SCRIPT_DIR / "identify_theme_segments.py"),
"--transcript", str(transcript_path),
"--output", str(highlights_path),
],
"完整主题分析Ollama→规则,时间节点非固定",
timeout=120,
)
else:
run(
[
sys.executable,
str(SCRIPT_DIR / "identify_highlights.py"),
"--transcript", str(transcript_path),
"--output", str(highlights_path),
"--clips", str(args.clips),
],
"高光识别Ollama→规则",
timeout=60,
)
run(
[
sys.executable,
str(SCRIPT_DIR / "identify_highlights.py"),
"--transcript", str(transcript_path),
"--output", str(highlights_path),
"--clips", str(args.clips),
],
"高光识别Ollama→规则",
timeout=60,
)
if not highlights_path.exists():
print(f"❌ 需要 highlights.json: {highlights_path}")
sys.exit(1)

View File

@@ -84,3 +84,4 @@
| 2026-02-22 11:32:57 | 🔄 卡若AI 同步 2026-02-22 11:32 | 更新:金仓、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 11:40:59 | 🔄 卡若AI 同步 2026-02-22 11:40 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 11:44:40 | 🔄 卡若AI 同步 2026-02-22 11:44 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个 |
| 2026-02-22 11:47:38 | 🔄 卡若AI 同步 2026-02-22 11:47 | 更新:水桥平台对接、运营中枢工作台 | 排除 >20MB: 8 个 |

View File

@@ -87,3 +87,4 @@
| 2026-02-22 11:32:57 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 11:32 | 更新:金仓、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 11:40:59 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 11:40 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 11:44:40 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 11:44 | 更新:水桥平台对接、卡木、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-02-22 11:47:38 | 成功 | 成功 | 🔄 卡若AI 同步 2026-02-22 11:47 | 更新:水桥平台对接、运营中枢工作台 | 排除 >20MB: 8 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |