#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Soul 创业派对 - 自动部署上传脚本(dist 切换方式) 本地 pnpm build → 打包 zip → 上传到服务器并解压到 dist2 → 宝塔暂停 soul → dist→dist1, dist2→dist → 删除 dist1 → 宝塔重启 soul 使用方法: python scripts/devlop.py # 完整流程 python scripts/devlop.py --no-build # 跳过本地构建(使用已有 .next/standalone) 环境变量(可选): DEPLOY_HOST # SSH 服务器,默认同 deploy_soul DEPLOY_USER / DEPLOY_PASSWORD / DEPLOY_SSH_KEY DEVOP_BASE_PATH # 服务器目录,默认 /www/wwwroot/auto-devlop/soul BAOTA_PANEL_URL / BAOTA_API_KEY DEPLOY_PM2_APP # Node 项目名,默认 soul """ from __future__ import print_function import os import sys import shutil import tempfile import argparse import json import zipfile import time # 确保能导入同目录的 deploy_soul script_dir = os.path.dirname(os.path.abspath(__file__)) if script_dir not in sys.path: sys.path.insert(0, script_dir) try: import paramiko except ImportError: print("错误: 请先安装 paramiko") print(" pip install paramiko") sys.exit(1) # 复用 deploy_soul 的构建与宝塔 API from deploy_soul import ( get_cfg as _deploy_cfg, run_build, stop_node_project, start_node_project, ) # ==================== 构建前清理(避免 Windows EBUSY) ==================== def clean_standalone_before_build(root, retries=3, delay=2): """ 构建前删除 .next/standalone,避免 Next.js 在 Windows 上因 EBUSY 无法 rmdir。 若目录被占用会重试几次并等待,仍失败则提示用 --no-build 或关闭占用进程。 """ standalone = os.path.join(root, ".next", "standalone") if not os.path.isdir(standalone): return True for attempt in range(1, retries + 1): try: shutil.rmtree(standalone) print(" [清理] 已删除 .next/standalone(第 %d 次尝试)" % attempt) return True except (OSError, PermissionError) as e: err = getattr(e, "winerror", None) or getattr(e, "errno", None) if attempt < retries: print(" [清理] .next/standalone 被占用,%ds 后重试 (%d/%d) ..." % (delay, attempt, retries)) time.sleep(delay) else: print(" [失败] 无法删除 .next/standalone(EBUSY/被占用)") print(" 请:1) 关闭占用该目录的进程(如其他终端、VS Code 文件预览)") print(" 2) 或先手动执行 pnpm build,再运行: python scripts/devlop.py --no-build") return False return False # ==================== 配置 ==================== def get_cfg(): """获取配置(在 deploy_soul 基础上增加 devlop 路径)""" cfg = _deploy_cfg() cfg["base_path"] = os.environ.get("DEVOP_BASE_PATH", "/www/wwwroot/auto-devlop/soul") cfg["dist_path"] = cfg["base_path"] + "/dist" cfg["dist2_path"] = cfg["base_path"] + "/dist2" return cfg # ==================== 打包为 ZIP ==================== # 打包 zip 时排除的目录名(路径中任一段匹配即跳过整棵子树) ZIP_EXCLUDE_DIRS = { ".cache", # node_modules/.cache, .next/cache "__pycache__", ".git", "node_modules", "cache", # .next/cache 等 "test", "tests", "coverage", ".nyc_output", ".turbo", "开发文档", "miniprogramPre", "my-app", "newpp", } # 打包时排除的文件名(精确匹配)或后缀 ZIP_EXCLUDE_FILE_NAMES = {".DS_Store", "Thumbs.db"} ZIP_EXCLUDE_FILE_SUFFIXES = (".log", ".map") # 可选:.map 可排除以减小体积 def _should_exclude_from_zip(arcname, is_file=True): """判断 zip 内相对路径是否应排除(不打包)。""" parts = arcname.replace("\\", "/").split("/") for part in parts: if part in ZIP_EXCLUDE_DIRS: return True if is_file: name = parts[-1] if parts else "" if name in ZIP_EXCLUDE_FILE_NAMES: return True if any(name.endswith(s) for s in ZIP_EXCLUDE_FILE_SUFFIXES): return True return False def _copy_with_dereference(src, dst): """复制文件或目录,跟随符号链接""" if os.path.islink(src): link_target = os.readlink(src) real_path = link_target if os.path.isabs(link_target) else os.path.join(os.path.dirname(src), link_target) if os.path.exists(real_path): if os.path.isdir(real_path): shutil.copytree(real_path, dst, symlinks=False, dirs_exist_ok=True) else: shutil.copy2(real_path, dst) else: shutil.copy2(src, dst, follow_symlinks=False) elif os.path.isdir(src): if os.path.exists(dst): shutil.rmtree(dst) shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=True) else: shutil.copy2(src, dst) def pack_standalone_zip(root): """打包 standalone 为 zip(逻辑与 deploy_soul.pack_standalone 一致,输出 zip)""" print("[2/7] 打包 standalone 为 zip ...") standalone = os.path.join(root, ".next", "standalone") static_src = os.path.join(root, ".next", "static") public_src = os.path.join(root, "public") ecosystem_src = os.path.join(root, "ecosystem.config.cjs") if not os.path.isdir(standalone): print(" [失败] 未找到 .next/standalone 目录") return None if not os.path.isdir(static_src): print(" [失败] 未找到 .next/static 目录") return None if not os.path.isdir(public_src): print(" [警告] 未找到 public 目录,继续打包") if not os.path.isfile(ecosystem_src): print(" [警告] 未找到 ecosystem.config.cjs,继续打包") staging = tempfile.mkdtemp(prefix="soul_devlop_") try: print(" 正在复制 standalone 目录内容...") for name in os.listdir(standalone): src = os.path.join(standalone, name) dst = os.path.join(staging, name) if name == "node_modules": print(" 正在复制 node_modules(处理符号链接)...") _copy_with_dereference(src, dst) # 修复 pnpm 依赖:提升 styled-jsx node_modules_dst = os.path.join(staging, "node_modules") pnpm_dir = os.path.join(node_modules_dst, ".pnpm") if os.path.isdir(pnpm_dir): for dep in ["styled-jsx"]: dep_in_root = os.path.join(node_modules_dst, dep) if not os.path.exists(dep_in_root): for pnpm_pkg in os.listdir(pnpm_dir): if pnpm_pkg.startswith(dep + "@"): src_dep = os.path.join(pnpm_dir, pnpm_pkg, "node_modules", dep) if os.path.isdir(src_dep): shutil.copytree(src_dep, dep_in_root, symlinks=False, dirs_exist_ok=True) break # 复制 .next/static、public、ecosystem static_dst = os.path.join(staging, ".next", "static") if os.path.exists(static_dst): shutil.rmtree(static_dst) os.makedirs(os.path.dirname(static_dst), exist_ok=True) shutil.copytree(static_src, static_dst) if os.path.isdir(public_src): public_dst = os.path.join(staging, "public") if os.path.exists(public_dst): shutil.rmtree(public_dst) shutil.copytree(public_src, public_dst) if os.path.isfile(ecosystem_src): shutil.copy2(ecosystem_src, os.path.join(staging, "ecosystem.config.cjs")) # 修正 package.json start 脚本 package_json_path = os.path.join(staging, "package.json") if os.path.isfile(package_json_path): try: with open(package_json_path, "r", encoding="utf-8") as f: package_data = json.load(f) if "scripts" not in package_data: package_data["scripts"] = {} package_data["scripts"]["start"] = "node server.js" with open(package_json_path, "w", encoding="utf-8") as f: json.dump(package_data, f, indent=2, ensure_ascii=False) except Exception: pass # 修改 server.js 默认端口:3000 → 30006 server_js_path = os.path.join(staging, "server.js") if os.path.isfile(server_js_path): try: with open(server_js_path, "r", encoding="utf-8") as f: server_js_content = f.read() # 替换默认端口:|| 3000 → || 30006 if "|| 3000" in server_js_content: server_js_content = server_js_content.replace("|| 3000", "|| 30006") with open(server_js_path, "w", encoding="utf-8") as f: f.write(server_js_content) print(" [修改] server.js 默认端口已改为 30006") else: print(" [提示] server.js 未找到 '|| 3000' 字符串,跳过端口修改") except Exception as e: print(" [警告] 修改 server.js 失败:", str(e)) # 打成 zip(仅包含顶层内容,解压后即 dist2 根目录;排除 ZIP_EXCLUDE_* 配置的目录/文件) zip_path = os.path.join(tempfile.gettempdir(), "soul_devlop.zip") excluded_count = [0] # 用列表以便内层可修改 with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for name in os.listdir(staging): path = os.path.join(staging, name) if os.path.isfile(path): if _should_exclude_from_zip(name): excluded_count[0] += 1 continue zf.write(path, name) else: for dirpath, dirs, filenames in os.walk(path): # 剪枝:不进入排除目录 dirs[:] = [d for d in dirs if not _should_exclude_from_zip( os.path.join(name, os.path.relpath(os.path.join(dirpath, d), path)), is_file=False )] for f in filenames: full = os.path.join(dirpath, f) arcname = os.path.join(name, os.path.relpath(full, path)) if _should_exclude_from_zip(arcname): excluded_count[0] += 1 continue zf.write(full, arcname) if excluded_count[0] > 0: print(" [过滤] 已排除 %d 个文件/目录(ZIP_EXCLUDE_*)" % excluded_count[0]) size_mb = os.path.getsize(zip_path) / 1024 / 1024 print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, size_mb)) return zip_path except Exception as e: print(" [失败] 打包异常:", str(e)) import traceback traceback.print_exc() return None finally: shutil.rmtree(staging, ignore_errors=True) # ==================== SSH 上传并解压到 dist2 ==================== def upload_zip_and_extract_to_dist2(cfg, zip_path): """上传 zip 到 base_path,解压到 base_path/dist2""" print("[3/7] SSH 上传 zip 并解压到 dist2 ...") host = cfg["host"] user = cfg["user"] password = cfg["password"] key_path = cfg["ssh_key"] base_path = cfg["base_path"] dist2_path = cfg["dist2_path"] if not password and not key_path: print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY") return False client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: print(" 正在连接 %s@%s ..." % (user, host)) if key_path and os.path.isfile(key_path): client.connect(host, username=user, key_filename=key_path, timeout=15) else: client.connect(host, username=user, password=password, timeout=15) print(" [成功] SSH 连接成功") remote_zip = base_path.rstrip("/") + "/soul_devlop.zip" sftp = client.open_sftp() try: # 确保目录存在 for part in ["/www", "/www/wwwroot", "/www/wwwroot/auto-devlop", "/www/wwwroot/auto-devlop/soul"]: try: sftp.stat(part) except FileNotFoundError: pass sftp.put(zip_path, remote_zip) print(" [成功] zip 上传完成: %s" % remote_zip) finally: sftp.close() # 解压到 dist2:先删旧 dist2,再创建并解压 cmd = ( "rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK" % (dist2_path, dist2_path, remote_zip, dist2_path, remote_zip) ) stdin, stdout, stderr = client.exec_command(cmd, timeout=120) err = stderr.read().decode("utf-8", errors="replace").strip() if err: print(" 服务器 stderr:", err) out = stdout.read().decode("utf-8", errors="replace").strip() exit_status = stdout.channel.recv_exit_status() if exit_status != 0 or "OK" not in out: print(" [失败] 解压失败,退出码: %s" % exit_status) if err: print(" stderr: %s" % err) if out: print(" stdout: %s" % out) return False print(" [成功] 已解压到: %s" % dist2_path) return True except paramiko.AuthenticationException: print(" [失败] SSH 认证失败") return False except Exception as e: print(" [失败] SSH 错误:", str(e)) import traceback traceback.print_exc() return False finally: client.close() # ==================== 服务器 dist2 内执行 pnpm install ==================== def run_pnpm_install_in_dist2(cfg): """在服务器 dist2 目录执行 pnpm install,失败时返回 (False, 错误信息)""" print("[4/7] 服务器 dist2 内执行 pnpm install ...") host = cfg["host"] user = cfg["user"] password = cfg["password"] key_path = cfg["ssh_key"] dist2_path = cfg["dist2_path"] if not password and not key_path: return False, "请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if key_path and os.path.isfile(key_path): client.connect(host, username=user, key_filename=key_path, timeout=15) else: client.connect(host, username=user, password=password, timeout=15) # 先查找 pnpm 路径并打印,方便调试 print(" 正在查找 pnpm 路径...") stdin, stdout, stderr = client.exec_command("bash -lc 'which pnpm'", timeout=10) pnpm_path = stdout.read().decode("utf-8", errors="replace").strip() if pnpm_path: print(" 找到 pnpm: %s" % pnpm_path) else: # 尝试常见路径 print(" 未找到 pnpm,尝试常见路径...") for test_path in ["/usr/local/bin/pnpm", "/usr/bin/pnpm", "~/.local/share/pnpm/pnpm"]: stdin, stdout, stderr = client.exec_command("test -f %s && echo OK" % test_path, timeout=5) if "OK" in stdout.read().decode("utf-8", errors="replace"): pnpm_path = test_path print(" 找到 pnpm: %s" % pnpm_path) break if not pnpm_path: return False, "未找到 pnpm 命令,请确认服务器已安装 pnpm (npm install -g pnpm)" # 使用 bash -lc 加载环境,并用找到的 pnpm 路径执行 # -l: 登录 shell,会加载 ~/.bash_profile 等 # -c: 执行命令 cmd = "bash -lc 'cd %s && %s install'" % (dist2_path, pnpm_path) print(" 执行命令: %s" % cmd) stdin, stdout, stderr = client.exec_command(cmd, timeout=300) out = stdout.read().decode("utf-8", errors="replace").strip() err = stderr.read().decode("utf-8", errors="replace").strip() exit_status = stdout.channel.recv_exit_status() # 显示部分输出(最后几行) if out: out_lines = out.split('\n') if len(out_lines) > 10: print(" 输出(最后10行):") for line in out_lines[-10:]: print(" " + line) else: print(" 输出: %s" % out) if exit_status != 0: msg = "pnpm install 失败,退出码: %s\n" % exit_status if err: msg += "stderr:\n%s\n" % err if out: msg += "stdout:\n%s" % out return False, msg print(" [成功] pnpm install 完成") return True, None except Exception as e: return False, "执行 pnpm install 异常: %s" % str(e) finally: client.close() # ==================== 暂停 → 重命名切换 → 重启 ==================== def remote_swap_dist_and_restart(cfg): """宝塔暂停 soul → dist→dist1, dist2→dist → 删除 dist1 → 宝塔重启 soul""" print("[5/7] 宝塔 API 暂停 Node 项目 soul ...") panel_url = cfg["panel_url"] api_key = cfg["api_key"] pm2_name = cfg["pm2_name"] base_path = cfg["base_path"] dist_path = cfg["dist_path"] dist2_path = cfg["dist2_path"] if not stop_node_project(panel_url, api_key, pm2_name): print(" [警告] 暂停可能未成功,继续执行切换") import time time.sleep(2) print("[6/7] 服务器上切换目录: dist→dist1, dist2→dist,删除 dist1 ...") host = cfg["host"] user = cfg["user"] password = cfg["password"] key_path = cfg["ssh_key"] client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if key_path and os.path.isfile(key_path): client.connect(host, username=user, key_filename=key_path, timeout=15) else: client.connect(host, username=user, password=password, timeout=15) # dist -> dist1, dist2 -> dist, rm -rf dist1 cmd = "cd %s && mv dist dist1 2>/dev/null; mv dist2 dist && rm -rf dist1 && echo OK" % base_path stdin, stdout, stderr = client.exec_command(cmd, timeout=60) err = stderr.read().decode("utf-8", errors="replace").strip() out = stdout.read().decode("utf-8", errors="replace").strip() exit_status = stdout.channel.recv_exit_status() if exit_status != 0 or "OK" not in out: print(" [失败] 切换失败,退出码: %s" % exit_status) if err: print(" stderr: %s" % err) if out: print(" stdout: %s" % out) return False print(" [成功] 已切换: 新版本位于 %s" % dist_path) except Exception as e: print(" [失败] 切换异常:", str(e)) return False finally: client.close() print("[7/7] 宝塔 API 重启 Node 项目 soul ...") if not start_node_project(panel_url, api_key, pm2_name): print(" [警告] 重启失败,请到宝塔 Node 项目里手动启动 soul") return False return True # ==================== 主函数 ==================== def main(): parser = argparse.ArgumentParser( description="Soul 自动部署:build → zip → 上传解压到 dist2 → 暂停 → 切换 dist → 重启", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("--no-build", action="store_true", help="跳过本地构建(使用已有 .next/standalone)") args = parser.parse_args() script_dir = os.path.dirname(os.path.abspath(__file__)) root = os.path.dirname(script_dir) cfg = get_cfg() print("=" * 60) print(" Soul 自动部署(dist 切换)") print("=" * 60) print(" 服务器: %s@%s" % (cfg["user"], cfg["host"])) print(" 目录: %s" % cfg["base_path"]) print(" 解压到: %s" % cfg["dist2_path"]) print(" 运行目录: %s" % cfg["dist_path"]) print(" Node 项目名: %s" % cfg["pm2_name"]) print("=" * 60) # 1. 本地构建 if not args.no_build: print("[1/7] 本地构建 pnpm build ...") if sys.platform == "win32": if not clean_standalone_before_build(root): return 1 if not run_build(root): return 1 else: if not os.path.isfile(os.path.join(root, ".next", "standalone", "server.js")): print("[错误] 跳过构建但未找到 .next/standalone/server.js") return 1 print("[1/7] 跳过本地构建") # 2. 打包 zip zip_path = pack_standalone_zip(root) if not zip_path: return 1 # 3. 上传并解压到 dist2 if not upload_zip_and_extract_to_dist2(cfg, zip_path): return 1 try: os.remove(zip_path) except Exception: pass # 4. 服务器 dist2 内 pnpm install ok, err_msg = run_pnpm_install_in_dist2(cfg) if not ok: print(" [失败] %s" % (err_msg or "pnpm install 失败")) return 1 # 5–7. 暂停 → 切换 → 重启 if not remote_swap_dist_and_restart(cfg): return 1 print("") print("=" * 60) print(" 部署完成!当前运行目录: %s" % cfg["dist_path"]) print("=" * 60) return 0 if __name__ == "__main__": sys.exit(main())