Files
soul-yongping/scripts/devlop.py

555 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Soul 创业派对 - 自动部署上传脚本dist 切换方式)
本地 pnpm build → 打包 zip → 上传到服务器并解压到 dist2 → 宝塔暂停 soul →
dist→dist1, dist2→dist → 删除 dist1 → 宝塔重启 soul
使用方法:
python scripts/devlop.py # 完整流程
python scripts/devlop.py --no-build # 跳过本地构建(使用已有 .next/standalone
环境变量(可选):
DEPLOY_HOST # SSH 服务器,默认同 deploy_soul
DEPLOY_USER / DEPLOY_PASSWORD / DEPLOY_SSH_KEY
DEVOP_BASE_PATH # 服务器目录,默认 /www/wwwroot/auto-devlop/soul
BAOTA_PANEL_URL / BAOTA_API_KEY
DEPLOY_PM2_APP # Node 项目名,默认 soul
"""
from __future__ import print_function
import os
import sys
import shutil
import tempfile
import argparse
import json
import zipfile
import time
# 确保能导入同目录的 deploy_soul
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
try:
import paramiko
except ImportError:
print("错误: 请先安装 paramiko")
print(" pip install paramiko")
sys.exit(1)
# 复用 deploy_soul 的构建与宝塔 API
from deploy_soul import (
get_cfg as _deploy_cfg,
run_build,
stop_node_project,
start_node_project,
)
# ==================== 构建前清理(避免 Windows EBUSY ====================
def clean_standalone_before_build(root, retries=3, delay=2):
"""
构建前删除 .next/standalone避免 Next.js 在 Windows 上因 EBUSY 无法 rmdir。
若目录被占用会重试几次并等待,仍失败则提示用 --no-build 或关闭占用进程。
"""
standalone = os.path.join(root, ".next", "standalone")
if not os.path.isdir(standalone):
return True
for attempt in range(1, retries + 1):
try:
shutil.rmtree(standalone)
print(" [清理] 已删除 .next/standalone%d 次尝试)" % attempt)
return True
except (OSError, PermissionError) as e:
err = getattr(e, "winerror", None) or getattr(e, "errno", None)
if attempt < retries:
print(" [清理] .next/standalone 被占用,%ds 后重试 (%d/%d) ..." % (delay, attempt, retries))
time.sleep(delay)
else:
print(" [失败] 无法删除 .next/standaloneEBUSY/被占用)")
print("1) 关闭占用该目录的进程如其他终端、VS Code 文件预览)")
print(" 2) 或先手动执行 pnpm build再运行: python scripts/devlop.py --no-build")
return False
return False
# ==================== 配置 ====================
def get_cfg():
"""获取配置(在 deploy_soul 基础上增加 devlop 路径)"""
cfg = _deploy_cfg()
cfg["base_path"] = os.environ.get("DEVOP_BASE_PATH", "/www/wwwroot/auto-devlop/soul")
cfg["dist_path"] = cfg["base_path"] + "/dist"
cfg["dist2_path"] = cfg["base_path"] + "/dist2"
return cfg
# ==================== 打包为 ZIP ====================
# 打包 zip 时排除的目录名(路径中任一段匹配即跳过整棵子树)
ZIP_EXCLUDE_DIRS = {
".cache", # node_modules/.cache, .next/cache
"__pycache__",
".git",
"node_modules",
"cache", # .next/cache 等
"test",
"tests",
"coverage",
".nyc_output",
".turbo",
"开发文档",
"miniprogram",
"my-app",
"newpp",
}
# 打包时排除的文件名(精确匹配)或后缀
ZIP_EXCLUDE_FILE_NAMES = {".DS_Store", "Thumbs.db"}
ZIP_EXCLUDE_FILE_SUFFIXES = (".log", ".map") # 可选:.map 可排除以减小体积
def _should_exclude_from_zip(arcname, is_file=True):
"""判断 zip 内相对路径是否应排除(不打包)。"""
parts = arcname.replace("\\", "/").split("/")
for part in parts:
if part in ZIP_EXCLUDE_DIRS:
return True
if is_file:
name = parts[-1] if parts else ""
if name in ZIP_EXCLUDE_FILE_NAMES:
return True
if any(name.endswith(s) for s in ZIP_EXCLUDE_FILE_SUFFIXES):
return True
return False
def _copy_with_dereference(src, dst):
"""复制文件或目录,跟随符号链接"""
if os.path.islink(src):
link_target = os.readlink(src)
real_path = link_target if os.path.isabs(link_target) else os.path.join(os.path.dirname(src), link_target)
if os.path.exists(real_path):
if os.path.isdir(real_path):
shutil.copytree(real_path, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(real_path, dst)
else:
shutil.copy2(src, dst, follow_symlinks=False)
elif os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
def pack_standalone_zip(root):
"""打包 standalone 为 zip逻辑与 deploy_soul.pack_standalone 一致,输出 zip"""
print("[2/7] 打包 standalone 为 zip ...")
standalone = os.path.join(root, ".next", "standalone")
static_src = os.path.join(root, ".next", "static")
public_src = os.path.join(root, "public")
ecosystem_src = os.path.join(root, "ecosystem.config.cjs")
if not os.path.isdir(standalone):
print(" [失败] 未找到 .next/standalone 目录")
return None
if not os.path.isdir(static_src):
print(" [失败] 未找到 .next/static 目录")
return None
if not os.path.isdir(public_src):
print(" [警告] 未找到 public 目录,继续打包")
if not os.path.isfile(ecosystem_src):
print(" [警告] 未找到 ecosystem.config.cjs继续打包")
staging = tempfile.mkdtemp(prefix="soul_devlop_")
try:
print(" 正在复制 standalone 目录内容...")
for name in os.listdir(standalone):
src = os.path.join(standalone, name)
dst = os.path.join(staging, name)
if name == "node_modules":
print(" 正在复制 node_modules处理符号链接...")
_copy_with_dereference(src, dst)
# 修复 pnpm 依赖:提升 styled-jsx
node_modules_dst = os.path.join(staging, "node_modules")
pnpm_dir = os.path.join(node_modules_dst, ".pnpm")
if os.path.isdir(pnpm_dir):
for dep in ["styled-jsx"]:
dep_in_root = os.path.join(node_modules_dst, dep)
if not os.path.exists(dep_in_root):
for pnpm_pkg in os.listdir(pnpm_dir):
if pnpm_pkg.startswith(dep + "@"):
src_dep = os.path.join(pnpm_dir, pnpm_pkg, "node_modules", dep)
if os.path.isdir(src_dep):
shutil.copytree(src_dep, dep_in_root, symlinks=False, dirs_exist_ok=True)
break
# 复制 .next/static、public、ecosystem
static_dst = os.path.join(staging, ".next", "static")
if os.path.exists(static_dst):
shutil.rmtree(static_dst)
os.makedirs(os.path.dirname(static_dst), exist_ok=True)
shutil.copytree(static_src, static_dst)
if os.path.isdir(public_src):
public_dst = os.path.join(staging, "public")
if os.path.exists(public_dst):
shutil.rmtree(public_dst)
shutil.copytree(public_src, public_dst)
if os.path.isfile(ecosystem_src):
shutil.copy2(ecosystem_src, os.path.join(staging, "ecosystem.config.cjs"))
# 修正 package.json start 脚本
package_json_path = os.path.join(staging, "package.json")
if os.path.isfile(package_json_path):
try:
with open(package_json_path, "r", encoding="utf-8") as f:
package_data = json.load(f)
if "scripts" not in package_data:
package_data["scripts"] = {}
package_data["scripts"]["start"] = "node server.js"
with open(package_json_path, "w", encoding="utf-8") as f:
json.dump(package_data, f, indent=2, ensure_ascii=False)
except Exception:
pass
# 修改 server.js 默认端口3000 → 30006
server_js_path = os.path.join(staging, "server.js")
if os.path.isfile(server_js_path):
try:
with open(server_js_path, "r", encoding="utf-8") as f:
server_js_content = f.read()
# 替换默认端口:|| 3000 → || 30006
if "|| 3000" in server_js_content:
server_js_content = server_js_content.replace("|| 3000", "|| 30006")
with open(server_js_path, "w", encoding="utf-8") as f:
f.write(server_js_content)
print(" [修改] server.js 默认端口已改为 30006")
else:
print(" [提示] server.js 未找到 '|| 3000' 字符串,跳过端口修改")
except Exception as e:
print(" [警告] 修改 server.js 失败:", str(e))
# 打成 zip仅包含顶层内容解压后即 dist2 根目录;排除 ZIP_EXCLUDE_* 配置的目录/文件)
zip_path = os.path.join(tempfile.gettempdir(), "soul_devlop.zip")
excluded_count = [0] # 用列表以便内层可修改
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for name in os.listdir(staging):
path = os.path.join(staging, name)
if os.path.isfile(path):
if _should_exclude_from_zip(name):
excluded_count[0] += 1
continue
zf.write(path, name)
else:
for dirpath, dirs, filenames in os.walk(path):
# 剪枝:不进入排除目录
dirs[:] = [d for d in dirs if not _should_exclude_from_zip(
os.path.join(name, os.path.relpath(os.path.join(dirpath, d), path)), is_file=False
)]
for f in filenames:
full = os.path.join(dirpath, f)
arcname = os.path.join(name, os.path.relpath(full, path))
if _should_exclude_from_zip(arcname):
excluded_count[0] += 1
continue
zf.write(full, arcname)
if excluded_count[0] > 0:
print(" [过滤] 已排除 %d 个文件/目录ZIP_EXCLUDE_*" % excluded_count[0])
size_mb = os.path.getsize(zip_path) / 1024 / 1024
print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, size_mb))
return zip_path
except Exception as e:
print(" [失败] 打包异常:", str(e))
import traceback
traceback.print_exc()
return None
finally:
shutil.rmtree(staging, ignore_errors=True)
# ==================== SSH 上传并解压到 dist2 ====================
def upload_zip_and_extract_to_dist2(cfg, zip_path):
"""上传 zip 到 base_path解压到 base_path/dist2"""
print("[3/7] SSH 上传 zip 并解压到 dist2 ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
base_path = cfg["base_path"]
dist2_path = cfg["dist2_path"]
if not password and not key_path:
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
return False
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(" 正在连接 %s@%s ..." % (user, host))
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
print(" [成功] SSH 连接成功")
remote_zip = base_path.rstrip("/") + "/soul_devlop.zip"
sftp = client.open_sftp()
try:
# 确保目录存在
for part in ["/www", "/www/wwwroot", "/www/wwwroot/auto-devlop", "/www/wwwroot/auto-devlop/soul"]:
try:
sftp.stat(part)
except FileNotFoundError:
pass
sftp.put(zip_path, remote_zip)
print(" [成功] zip 上传完成: %s" % remote_zip)
finally:
sftp.close()
# 解压到 dist2先删旧 dist2再创建并解压
cmd = (
"rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK"
% (dist2_path, dist2_path, remote_zip, dist2_path, remote_zip)
)
stdin, stdout, stderr = client.exec_command(cmd, timeout=120)
err = stderr.read().decode("utf-8", errors="replace").strip()
if err:
print(" 服务器 stderr:", err)
out = stdout.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 解压失败,退出码: %s" % exit_status)
if err:
print(" stderr: %s" % err)
if out:
print(" stdout: %s" % out)
return False
print(" [成功] 已解压到: %s" % dist2_path)
return True
except paramiko.AuthenticationException:
print(" [失败] SSH 认证失败")
return False
except Exception as e:
print(" [失败] SSH 错误:", str(e))
import traceback
traceback.print_exc()
return False
finally:
client.close()
# ==================== 服务器 dist2 内执行 pnpm install ====================
def run_pnpm_install_in_dist2(cfg):
"""在服务器 dist2 目录执行 pnpm install失败时返回 (False, 错误信息)"""
print("[4/7] 服务器 dist2 内执行 pnpm install ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
dist2_path = cfg["dist2_path"]
if not password and not key_path:
return False, "请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
# 先查找 pnpm 路径并打印,方便调试
print(" 正在查找 pnpm 路径...")
stdin, stdout, stderr = client.exec_command("bash -lc 'which pnpm'", timeout=10)
pnpm_path = stdout.read().decode("utf-8", errors="replace").strip()
if pnpm_path:
print(" 找到 pnpm: %s" % pnpm_path)
else:
# 尝试常见路径
print(" 未找到 pnpm尝试常见路径...")
for test_path in ["/usr/local/bin/pnpm", "/usr/bin/pnpm", "~/.local/share/pnpm/pnpm"]:
stdin, stdout, stderr = client.exec_command("test -f %s && echo OK" % test_path, timeout=5)
if "OK" in stdout.read().decode("utf-8", errors="replace"):
pnpm_path = test_path
print(" 找到 pnpm: %s" % pnpm_path)
break
if not pnpm_path:
return False, "未找到 pnpm 命令,请确认服务器已安装 pnpm (npm install -g pnpm)"
# 使用 bash -lc 加载环境,并用找到的 pnpm 路径执行
# -l: 登录 shell会加载 ~/.bash_profile 等
# -c: 执行命令
cmd = "bash -lc 'cd %s && %s install'" % (dist2_path, pnpm_path)
print(" 执行命令: %s" % cmd)
stdin, stdout, stderr = client.exec_command(cmd, timeout=300)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
# 显示部分输出(最后几行)
if out:
out_lines = out.split('\n')
if len(out_lines) > 10:
print(" 输出最后10行:")
for line in out_lines[-10:]:
print(" " + line)
else:
print(" 输出: %s" % out)
if exit_status != 0:
msg = "pnpm install 失败,退出码: %s\n" % exit_status
if err:
msg += "stderr:\n%s\n" % err
if out:
msg += "stdout:\n%s" % out
return False, msg
print(" [成功] pnpm install 完成")
return True, None
except Exception as e:
return False, "执行 pnpm install 异常: %s" % str(e)
finally:
client.close()
# ==================== 暂停 → 重命名切换 → 重启 ====================
def remote_swap_dist_and_restart(cfg):
"""宝塔暂停 soul → dist→dist1, dist2→dist → 删除 dist1 → 宝塔重启 soul"""
print("[5/7] 宝塔 API 暂停 Node 项目 soul ...")
panel_url = cfg["panel_url"]
api_key = cfg["api_key"]
pm2_name = cfg["pm2_name"]
base_path = cfg["base_path"]
dist_path = cfg["dist_path"]
dist2_path = cfg["dist2_path"]
if not stop_node_project(panel_url, api_key, pm2_name):
print(" [警告] 暂停可能未成功,继续执行切换")
import time
time.sleep(2)
print("[6/7] 服务器上切换目录: dist→dist1, dist2→dist删除 dist1 ...")
host = cfg["host"]
user = cfg["user"]
password = cfg["password"]
key_path = cfg["ssh_key"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if key_path and os.path.isfile(key_path):
client.connect(host, username=user, key_filename=key_path, timeout=15)
else:
client.connect(host, username=user, password=password, timeout=15)
# dist -> dist1, dist2 -> dist, rm -rf dist1
cmd = "cd %s && mv dist dist1 2>/dev/null; mv dist2 dist && rm -rf dist1 && echo OK" % base_path
stdin, stdout, stderr = client.exec_command(cmd, timeout=60)
err = stderr.read().decode("utf-8", errors="replace").strip()
out = stdout.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 切换失败,退出码: %s" % exit_status)
if err:
print(" stderr: %s" % err)
if out:
print(" stdout: %s" % out)
return False
print(" [成功] 已切换: 新版本位于 %s" % dist_path)
except Exception as e:
print(" [失败] 切换异常:", str(e))
return False
finally:
client.close()
print("[7/7] 宝塔 API 重启 Node 项目 soul ...")
if not start_node_project(panel_url, api_key, pm2_name):
print(" [警告] 重启失败,请到宝塔 Node 项目里手动启动 soul")
return False
return True
# ==================== 主函数 ====================
def main():
parser = argparse.ArgumentParser(
description="Soul 自动部署build → zip → 上传解压到 dist2 → 暂停 → 切换 dist → 重启",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--no-build", action="store_true", help="跳过本地构建(使用已有 .next/standalone")
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
root = os.path.dirname(script_dir)
cfg = get_cfg()
print("=" * 60)
print(" Soul 自动部署dist 切换)")
print("=" * 60)
print(" 服务器: %s@%s" % (cfg["user"], cfg["host"]))
print(" 目录: %s" % cfg["base_path"])
print(" 解压到: %s" % cfg["dist2_path"])
print(" 运行目录: %s" % cfg["dist_path"])
print(" Node 项目名: %s" % cfg["pm2_name"])
print("=" * 60)
# 1. 本地构建
if not args.no_build:
print("[1/7] 本地构建 pnpm build ...")
if sys.platform == "win32":
if not clean_standalone_before_build(root):
return 1
if not run_build(root):
return 1
else:
if not os.path.isfile(os.path.join(root, ".next", "standalone", "server.js")):
print("[错误] 跳过构建但未找到 .next/standalone/server.js")
return 1
print("[1/7] 跳过本地构建")
# 2. 打包 zip
zip_path = pack_standalone_zip(root)
if not zip_path:
return 1
# 3. 上传并解压到 dist2
if not upload_zip_and_extract_to_dist2(cfg, zip_path):
return 1
try:
os.remove(zip_path)
except Exception:
pass
# 4. 服务器 dist2 内 pnpm install
ok, err_msg = run_pnpm_install_in_dist2(cfg)
if not ok:
print(" [失败] %s" % (err_msg or "pnpm install 失败"))
return 1
# 57. 暂停 → 切换 → 重启
if not remote_swap_dist_and_restart(cfg):
return 1
print("")
print("=" * 60)
print(" 部署完成!当前运行目录: %s" % cfg["dist_path"])
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())