Files
soul-yongping/scripts/devlop.py

555 lines
21 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Soul 创业派对 - 自动部署上传脚本dist 切换方式
本地 pnpm build 打包 zip 上传到服务器并解压到 dist2 宝塔暂停 soul
distdist1, dist2dist 删除 dist1 宝塔重启 soul
使用方法:
python scripts/devlop.py # 完整流程
python scripts/devlop.py --no-build # 跳过本地构建(使用已有 .next/standalone
环境变量可选:
DEPLOY_HOST # SSH 服务器,默认同 deploy_soul
DEPLOY_USER / DEPLOY_PASSWORD / DEPLOY_SSH_KEY
DEVOP_BASE_PATH # 服务器目录,默认 /www/wwwroot/auto-devlop/soul
BAOTA_PANEL_URL / BAOTA_API_KEY
DEPLOY_PM2_APP # Node 项目名,默认 soul
"""
from __future__ import print_function
import os
import sys
import shutil
import tempfile
import argparse
import json
import zipfile
import time
# 确保能导入同目录的 deploy_soul
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
try:
import paramiko
except ImportError:
print("错误: 请先安装 paramiko")
print(" pip install paramiko")
sys.exit(1)
# 复用 deploy_soul 的构建与宝塔 API
from deploy_soul import (
get_cfg as _deploy_cfg,
run_build,
stop_node_project,
start_node_project,
)
# ==================== 构建前清理(避免 Windows EBUSY ====================
def clean_standalone_before_build(root, retries=3, delay=2):
"""
构建前删除 .next/standalone避免 Next.js Windows 上因 EBUSY 无法 rmdir
若目录被占用会重试几次并等待仍失败则提示用 --no-build 或关闭占用进程
"""
standalone = os.path.join(root, ".next", "standalone")
if not os.path.isdir(standalone):
return True
for attempt in range(1, retries + 1):
try:
shutil.rmtree(standalone)
print(" [清理] 已删除 .next/standalone%d 次尝试)" % attempt)
return True
except (OSError, PermissionError) as e:
err = getattr(e, "winerror", None) or getattr(e, "errno", None)
if attempt < retries:
print(" [清理] .next/standalone 被占用,%ds 后重试 (%d/%d) ..." % (delay, attempt, retries))
time.sleep(delay)
else:
print(" [失败] 无法删除 .next/standaloneEBUSY/被占用)")
print("1) 关闭占用该目录的进程如其他终端、VS Code 文件预览)")
print(" 2) 或先手动执行 pnpm build再运行: python scripts/devlop.py --no-build")
return False
return False
# ==================== 配置 ====================
def get_cfg():
"""获取配置(在 deploy_soul 基础上增加 devlop 路径)"""
cfg = _deploy_cfg()
cfg["base_path"] = os.environ.get("DEVOP_BASE_PATH", "/www/wwwroot/auto-devlop/soul")
cfg["dist_path"] = cfg["base_path"] + "/dist"
cfg["dist2_path"] = cfg["base_path"] + "/dist2"
return cfg
# ==================== 打包为 ZIP ====================
# 打包 zip 时排除的目录名(路径中任一段匹配即跳过整棵子树)
ZIP_EXCLUDE_DIRS = {
".cache", # node_modules/.cache, .next/cache
"__pycache__",
".git",
"node_modules",
"cache", # .next/cache 等
"test",
"tests",
"coverage",
".nyc_output",
".turbo",
"开发文档",
2026-02-03 12:52:23 +08:00
"miniprogramPre",
"my-app",
"newpp",
}
# 打包时排除的文件名(精确匹配)或后缀
ZIP_EXCLUDE_FILE_NAMES = {".DS_Store", "Thumbs.db"}
ZIP_EXCLUDE_FILE_SUFFIXES = (".log", ".map") # 可选:.map 可排除以减小体积
def _should_exclude_from_zip(arcname, is_file=True):
"""判断 zip 内相对路径是否应排除(不打包)。"""
parts = arcname.replace("\\", "/").split("/")
for part in parts:
if part in ZIP_EXCLUDE_DIRS:
return True
if is_file:
name = parts[-1] if parts else ""
if name in ZIP_EXCLUDE_FILE_NAMES:
return True
if any(name.endswith(s) for s in ZIP_EXCLUDE_FILE_SUFFIXES):
return True
return False
def _copy_with_dereference(src, dst):
"""复制文件或目录,跟随符号链接"""
if os.path.islink(src):
link_target = os.readlink(src)
real_path = link_target if os.path.isabs(link_target) else os.path.join(os.path.dirname(src), link_target)
if os.path.exists(real_path):
if os.path.isdir(real_path):
shutil.copytree(real_path, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(real_path, dst)
else:
shutil.copy2(src, dst, follow_symlinks=False)
elif os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
def pack_standalone_zip(root):
"""打包 standalone 为 zip逻辑与 deploy_soul.pack_standalone 一致,输出 zip"""
print("[2/7] 打包 standalone 为 zip ...")
standalone = os.path.join(root, ".next", "standalone")
static_src = os.path.join(root, ".next", "static")
public_src = os.path.join(root, "public")
ecosystem_src = os.path.join(root, "ecosystem.config.cjs")
if not os.path.isdir(standalone):
print(" [失败] 未找到 .next/standalone 目录")
return None
if not os.path.isdir(static_src):
print(" [失败] 未找到 .next/static 目录")
return None
if not os.path.isdir(public_src):
print(" [警告] 未找到 public 目录,继续打包")
if not os.path.isfile(ecosystem_src):
print(" [警告] 未找到 ecosystem.config.cjs继续打包")
staging = tempfile.mkdtemp(prefix="soul_devlop_")
try:
print(" 正在复制 standalone 目录内容...")
for name in os.listdir(standalone):
src = os.path.join(standalone, name)
dst = os.path.join(staging, name)
if name == "node_modules":
print(" 正在复制 node_modules处理符号链接...")
_copy_with_dereference(src, dst)
# 修复 pnpm 依赖:提升 styled-jsx
node_modules_dst = os.path.join(staging, "node_modules")
pnpm_dir = os.path.join(node_modules_dst, ".pnpm")
if os.path.isdir(pnpm_dir):
for dep in ["styled-jsx"]:
dep_in_root = os.path.join(node_modules_dst, dep)
if not os.path.exists(dep_in_root):
for pnpm_pkg in os.listdir(pnpm_dir):
if pnpm_pkg.startswith(dep + "@"):
src_dep = os.path.join(pnpm_dir, pnpm_pkg, "node_modules", dep)
if os.path.isdir(src_dep):
shutil.copytree(src_dep, dep_in_root, symlinks=False, dirs_exist_ok=True)
break
# 复制 .next/static、public、ecosystem
static_dst = os.path.join(staging, ".next", "static")
if os.path.exists(static_dst):
shutil.rmtree(static_dst)
os.makedirs(os.path.dirname(static_dst), exist_ok=True)
shutil.copytree(static_src, static_dst)
if os.path.isdir(public_src):
public_dst = os.path.join(staging, "public")
if os.path.exists(public_dst):
shutil.rmtree(public_dst)
shutil.copytree(public_src, public_dst)
if os.path.isfile(ecosystem_src):
shutil.copy2(ecosystem_src, os.path.join(staging, "ecosystem.config.cjs"))
# 修正 package.json start 脚本
package_json_path = os.path.join(staging, "package.json")
if os.path.isfile(package_json_path):
try:
with open(package_json_path, "r", encoding="utf-8") as f:
package_data = json.load(f)
if "scripts" not in package_data:
package_data["scripts"] = {}
package_data["scripts"]["start"] = "node server.js"
with open(package_json_path, "w", encoding="utf-8") as f:
json.dump(package_data, f, indent=2, ensure_ascii=False)
except Exception:
pass
# 修改 server.js 默认端口3000 → 30006
server_js_path = os.path.join(staging, "server.js")
if os.path.isfile(server_js_path):
try:
with open(server_js_path, "r", encoding="utf-8") as f:
server_js_content = f.read()
# 替换默认端口:|| 3000 → || 30006
if "|| 3000" in server_js_content:
server_js_content = server_js_content.replace("|| 3000", "|| 30006")
with open(server_js_path, "w", encoding="utf-8") as f:
f.write(server_js_content)
print(" [修改] server.js 默认端口已改为 30006")
else:
print(" [提示] server.js 未找到 '|| 3000' 字符串,跳过端口修改")
except Exception as e:
print(" [警告] 修改 server.js 失败:", str(e))
# 打成 zip仅包含顶层内容解压后即 dist2 根目录;排除 ZIP_EXCLUDE_* 配置的目录/文件)
zip_path = os.path.join(tempfile.gettempdir(), "soul_devlop.zip")
excluded_count = [0] # 用列表以便内层可修改
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for name in os.listdir(staging):
path = os.path.join(staging, name)
if os.path.isfile(path):
if _should_exclude_from_zip(name):
excluded_count[0] += 1
continue
zf.write(path, name)
else:
for dirpath, dirs, filenames in os.walk(path):
# 剪枝:不进入排除目录
dirs[:] = [d for d in dirs if not _should_exclude_from_zip(
os.path.join(name, os.path.relpath(os.path.join(dirpath, d), path)), is_file=False
)]
for f in filenames:
full = os.path.join(dirpath, f)
arcname = os.path.join(name, os.path.relpath(full, path))
if _should_exclude_from_zip(arcname):
excluded_count[0] += 1
continue
zf.write(full, arcname)
if excluded_count[0] > 0:
print(" [过滤] 已排除 %d 个文件/目录ZIP_EXCLUDE_*" % excluded_count[0])
size_mb = os.path.getsize(zip_path) / 1024 / 1024
print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, size_mb))
return zip_path
except Exception as e:
print(" [失败] 打包异常:", str(e))
import traceback
traceback.print_exc()
return None
finally:
shutil.rmtree(staging, ignore_errors=True)
# ==================== SSH 上传并解压到 dist2 ====================
def upload_zip_and_extract_to_dist2(cfg, zip_path):
"""上传 zip 到 base_path解压到 base_path/dist2"""
print("[3/7] SSH 上传 zip 并解压到 dist2 ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
base_path = cfg["base_path"]
dist2_path = cfg["dist2_path"]
if not password and not key_path:
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
return False
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(" 正在连接 %s@%s ..." % (user, host))
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
print(" [成功] SSH 连接成功")
remote_zip = base_path.rstrip("/") + "/soul_devlop.zip"
sftp = client.open_sftp()
try:
# 确保目录存在
for part in ["/www", "/www/wwwroot", "/www/wwwroot/auto-devlop", "/www/wwwroot/auto-devlop/soul"]:
try:
sftp.stat(part)
except FileNotFoundError:
pass
sftp.put(zip_path, remote_zip)
print(" [成功] zip 上传完成: %s" % remote_zip)
finally:
sftp.close()
# 解压到 dist2先删旧 dist2再创建并解压
cmd = (
"rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK"
% (dist2_path, dist2_path, remote_zip, dist2_path, remote_zip)
)
stdin, stdout, stderr = client.exec_command(cmd, timeout=120)
err = stderr.read().decode("utf-8", errors="replace").strip()
if err:
print(" 服务器 stderr:", err)
out = stdout.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 解压失败,退出码: %s" % exit_status)
if err:
print(" stderr: %s" % err)
if out:
print(" stdout: %s" % out)
return False
print(" [成功] 已解压到: %s" % dist2_path)
return True
except paramiko.AuthenticationException:
print(" [失败] SSH 认证失败")
return False
except Exception as e:
print(" [失败] SSH 错误:", str(e))
import traceback
traceback.print_exc()
return False
finally:
client.close()
# ==================== 服务器 dist2 内执行 pnpm install ====================
def run_pnpm_install_in_dist2(cfg):
"""在服务器 dist2 目录执行 pnpm install失败时返回 (False, 错误信息)"""
print("[4/7] 服务器 dist2 内执行 pnpm install ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
dist2_path = cfg["dist2_path"]
if not password and not key_path:
return False, "请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
# 先查找 pnpm 路径并打印,方便调试
print(" 正在查找 pnpm 路径...")
stdin, stdout, stderr = client.exec_command("bash -lc 'which pnpm'", timeout=10)
pnpm_path = stdout.read().decode("utf-8", errors="replace").strip()
if pnpm_path:
print(" 找到 pnpm: %s" % pnpm_path)
else:
# 尝试常见路径
print(" 未找到 pnpm尝试常见路径...")
for test_path in ["/usr/local/bin/pnpm", "/usr/bin/pnpm", "~/.local/share/pnpm/pnpm"]:
stdin, stdout, stderr = client.exec_command("test -f %s && echo OK" % test_path, timeout=5)
if "OK" in stdout.read().decode("utf-8", errors="replace"):
pnpm_path = test_path
print(" 找到 pnpm: %s" % pnpm_path)
break
if not pnpm_path:
return False, "未找到 pnpm 命令,请确认服务器已安装 pnpm (npm install -g pnpm)"
# 使用 bash -lc 加载环境,并用找到的 pnpm 路径执行
# -l: 登录 shell会加载 ~/.bash_profile 等
# -c: 执行命令
cmd = "bash -lc 'cd %s && %s install'" % (dist2_path, pnpm_path)
print(" 执行命令: %s" % cmd)
stdin, stdout, stderr = client.exec_command(cmd, timeout=300)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
# 显示部分输出(最后几行)
if out:
out_lines = out.split('\n')
if len(out_lines) > 10:
print(" 输出最后10行:")
for line in out_lines[-10:]:
print(" " + line)
else:
print(" 输出: %s" % out)
if exit_status != 0:
msg = "pnpm install 失败,退出码: %s\n" % exit_status
if err:
msg += "stderr:\n%s\n" % err
if out:
msg += "stdout:\n%s" % out
return False, msg
print(" [成功] pnpm install 完成")
return True, None
except Exception as e:
return False, "执行 pnpm install 异常: %s" % str(e)
finally:
client.close()
# ==================== 暂停 → 重命名切换 → 重启 ====================
def remote_swap_dist_and_restart(cfg):
"""宝塔暂停 soul → dist→dist1, dist2→dist → 删除 dist1 → 宝塔重启 soul"""
print("[5/7] 宝塔 API 暂停 Node 项目 soul ...")
panel_url = cfg["panel_url"]
api_key = cfg["api_key"]
pm2_name = cfg["pm2_name"]
base_path = cfg["base_path"]
dist_path = cfg["dist_path"]
dist2_path = cfg["dist2_path"]
if not stop_node_project(panel_url, api_key, pm2_name):
print(" [警告] 暂停可能未成功,继续执行切换")
import time
time.sleep(2)
print("[6/7] 服务器上切换目录: dist→dist1, dist2→dist删除 dist1 ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
# dist -> dist1, dist2 -> dist, rm -rf dist1
cmd = "cd %s && mv dist dist1 2>/dev/null; mv dist2 dist && rm -rf dist1 && echo OK" % base_path
stdin, stdout, stderr = client.exec_command(cmd, timeout=60)
err = stderr.read().decode("utf-8", errors="replace").strip()
out = stdout.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 切换失败,退出码: %s" % exit_status)
if err:
print(" stderr: %s" % err)
if out:
print(" stdout: %s" % out)
return False
print(" [成功] 已切换: 新版本位于 %s" % dist_path)
except Exception as e:
print(" [失败] 切换异常:", str(e))
return False
finally:
client.close()
print("[7/7] 宝塔 API 重启 Node 项目 soul ...")
if not start_node_project(panel_url, api_key, pm2_name):
print(" [警告] 重启失败,请到宝塔 Node 项目里手动启动 soul")
return False
return True
# ==================== 主函数 ====================
def main():
parser = argparse.ArgumentParser(
description="Soul 自动部署build → zip → 上传解压到 dist2 → 暂停 → 切换 dist → 重启",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--no-build", action="store_true", help="跳过本地构建(使用已有 .next/standalone")
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
root = os.path.dirname(script_dir)
cfg = get_cfg()
print("=" * 60)
print(" Soul 自动部署dist 切换)")
print("=" * 60)
print(" 服务器: %s@%s" % (cfg["user"], cfg["host"]))
print(" 目录: %s" % cfg["base_path"])
print(" 解压到: %s" % cfg["dist2_path"])
print(" 运行目录: %s" % cfg["dist_path"])
print(" Node 项目名: %s" % cfg["pm2_name"])
print("=" * 60)
# 1. 本地构建
if not args.no_build:
print("[1/7] 本地构建 pnpm build ...")
if sys.platform == "win32":
if not clean_standalone_before_build(root):
return 1
if not run_build(root):
return 1
else:
if not os.path.isfile(os.path.join(root, ".next", "standalone", "server.js")):
print("[错误] 跳过构建但未找到 .next/standalone/server.js")
return 1
print("[1/7] 跳过本地构建")
# 2. 打包 zip
zip_path = pack_standalone_zip(root)
if not zip_path:
return 1
# 3. 上传并解压到 dist2
if not upload_zip_and_extract_to_dist2(cfg, zip_path):
return 1
try:
os.remove(zip_path)
except Exception:
pass
# 4. 服务器 dist2 内 pnpm install
ok, err_msg = run_pnpm_install_in_dist2(cfg)
if not ok:
print(" [失败] %s" % (err_msg or "pnpm install 失败"))
return 1
# 57. 暂停 → 切换 → 重启
if not remote_swap_dist_and_restart(cfg):
return 1
print("")
print("=" * 60)
print(" 部署完成!当前运行目录: %s" % cfg["dist_path"])
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())