#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ soul-admin 静态站点部署:打包 dist → 上传 → 解压到 dist2 → dist/dist2 互换实现无缝切换。 不安装依赖、不重启、不调用宝塔 API。 """ from __future__ import print_function import os import sys import shlex import tempfile import argparse import zipfile try: import paramiko except ImportError: print("错误: 请先安装 paramiko") print(" pip install paramiko") sys.exit(1) # ==================== 配置 ==================== # 站点根目录(Nginx 等指向的目录的上一级,即 dist 的父目录) DEPLOY_BASE_PATH = "/www/wwwroot/self/soul-admin" # 切换后 chown 的属主,宝塔一般为 www:www,空则跳过 DEPLOY_WWW_USER = os.environ.get("DEPLOY_WWW_USER", "www:www") DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022")) def get_cfg(): base = os.environ.get("DEPLOY_BASE_PATH", DEPLOY_BASE_PATH).rstrip("/") return { "host": os.environ.get("DEPLOY_HOST", "43.139.27.93"), "user": os.environ.get("DEPLOY_USER", "root"), "password": os.environ.get("DEPLOY_PASSWORD", "Zhiqun1984"), "ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""), "base_path": base, "dist_path": base + "/dist", "dist2_path": base + "/dist2", "www_user": os.environ.get("DEPLOY_WWW_USER", DEPLOY_WWW_USER).strip(), } # ==================== 本地构建 ==================== def run_build(root): """执行本地 pnpm build""" use_shell = sys.platform == "win32" dist_dir = os.path.join(root, "dist") index_html = os.path.join(dist_dir, "index.html") try: r = __import__("subprocess").run( ["pnpm", "build"], cwd=root, shell=use_shell, timeout=300, capture_output=True, text=True, encoding="utf-8", errors="replace", ) if r.returncode != 0: print(" [失败] 构建失败,退出码:", r.returncode) for line in (r.stdout or "").strip().split("\n")[-10:]: print(" " + line) return False except __import__("subprocess").TimeoutExpired: print(" [失败] 构建超时") return False except FileNotFoundError: print(" [失败] 未找到 pnpm,请安装: npm install -g pnpm") return False except Exception as e: print(" [失败] 构建异常:", str(e)) return False if not os.path.isfile(index_html): print(" [失败] 未找到 dist/index.html") return False print(" [成功] 构建完成") return True # ==================== 打包 dist 为 zip ==================== def pack_dist_zip(root): """将本地 dist 目录打包为 zip(解压到 dist2 后即为站点根内容)""" print("[2/4] 打包 dist 为 zip ...") dist_dir = os.path.join(root, "dist") if not os.path.isdir(dist_dir): print(" [失败] 未找到 dist 目录") return None index_html = os.path.join(dist_dir, "index.html") if not os.path.isfile(index_html): print(" [失败] 未找到 dist/index.html,请先执行 pnpm build") return None zip_path = os.path.join(tempfile.gettempdir(), "soul_admin_deploy.zip") try: with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for dirpath, _dirs, filenames in os.walk(dist_dir): for f in filenames: full = os.path.join(dirpath, f) arcname = os.path.relpath(full, dist_dir).replace("\\", "/") zf.write(full, arcname) print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, os.path.getsize(zip_path) / 1024 / 1024)) return zip_path except Exception as e: print(" [失败] 打包异常:", str(e)) return None # ==================== SSH 上传并解压到 dist2 ==================== def upload_zip_and_extract_to_dist2(cfg, zip_path): """上传 zip 到服务器并解压到 dist2""" print("[3/4] SSH 上传 zip 并解压到 dist2 ...") sys.stdout.flush() if not cfg.get("password") and not cfg.get("ssh_key"): print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY") return False zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: print(" 正在连接 %s@%s:%s ..." % (cfg["user"], cfg["host"], DEFAULT_SSH_PORT)) sys.stdout.flush() if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]): client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=30, banner_timeout=30, ) else: client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=30, banner_timeout=30, ) print(" [OK] SSH 已连接,正在上传 zip(%.1f MB)..." % zip_size_mb) sys.stdout.flush() remote_zip = cfg["base_path"] + "/soul_admin_deploy.zip" sftp = client.open_sftp() chunk_mb = 5.0 last_reported = [0] def _progress(transferred, total): if total and total > 0: now_mb = transferred / (1024 * 1024) if now_mb - last_reported[0] >= chunk_mb or transferred >= total: last_reported[0] = now_mb print("\r 上传进度: %.1f / %.1f MB" % (now_mb, total / (1024 * 1024)), end="") sys.stdout.flush() sftp.put(zip_path, remote_zip, callback=_progress) if zip_size_mb >= chunk_mb: print("") print(" [OK] zip 已上传,正在服务器解压到 dist2 ...") sys.stdout.flush() sftp.close() dist2 = cfg["dist2_path"] cmd = "rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK" % ( dist2, dist2, remote_zip, dist2, remote_zip, ) stdin, stdout, stderr = client.exec_command(cmd, timeout=120) out = stdout.read().decode("utf-8", errors="replace").strip() err = stderr.read().decode("utf-8", errors="replace").strip() if err: print(" 服务器 stderr: %s" % err[:500]) exit_status = stdout.channel.recv_exit_status() if exit_status != 0 or "OK" not in out: print(" [失败] 解压失败,退出码: %s" % exit_status) if out: print(" stdout: %s" % out[:300]) return False print(" [成功] 已解压到: %s" % dist2) return True except Exception as e: print(" [失败] SSH 错误: %s" % str(e)) import traceback traceback.print_exc() return False finally: client.close() # ==================== 服务器目录切换:dist → dist1,dist2 → dist ==================== def remote_swap_dist(cfg): """服务器上:dist→dist1,dist2→dist,删除 dist1,实现无缝切换""" print("[4/4] 服务器切换目录: dist→dist1, dist2→dist ...") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]): client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15, ) else: client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15, ) base = cfg["base_path"] # 若当前没有 dist(首次部署),则 dist2 直接改名为 dist;若有 dist 则先备份再替换 cmd = "cd %s && (test -d dist && (mv dist dist1 && mv dist2 dist && rm -rf dist1) || mv dist2 dist) && echo OK" % base stdin, stdout, stderr = client.exec_command(cmd, timeout=60) out = stdout.read().decode("utf-8", errors="replace").strip() err = stderr.read().decode("utf-8", errors="replace").strip() exit_status = stdout.channel.recv_exit_status() if exit_status != 0 or "OK" not in out: print(" [失败] 切换失败 (退出码: %s)" % exit_status) if err: print(" 服务器 stderr: %s" % err) if out and "OK" not in out: print(" 服务器 stdout: %s" % out) return False print(" [成功] 新版本已切换至: %s" % cfg["dist_path"]) # 切换后设置 www 访问权限,否则 Nginx 无法读文件导致无法访问 www_user = cfg.get("www_user") if www_user: dist_path = cfg["dist_path"] chown_cmd = "chown -R %s %s && echo OK" % (www_user, shlex.quote(dist_path)) stdin, stdout, stderr = client.exec_command(chown_cmd, timeout=60) chown_out = stdout.read().decode("utf-8", errors="replace").strip() chown_err = stderr.read().decode("utf-8", errors="replace").strip() if stdout.channel.recv_exit_status() != 0 or "OK" not in chown_out: print(" [警告] chown 失败,站点可能无法访问: %s" % (chown_err or chown_out)) else: print(" [成功] 已设置属主: %s" % www_user) return True except Exception as e: print(" [失败] SSH 错误: %s" % str(e)) return False finally: client.close() # ==================== 主函数 ==================== def main(): parser = argparse.ArgumentParser( description="soul-admin 静态站点部署(dist2 解压后目录切换,无缝更新)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="不安装依赖、不重启、不调用宝塔 API。站点路径: " + DEPLOY_BASE_PATH + "/dist", ) parser.add_argument("--no-build", action="store_true", help="跳过本地 pnpm build") args = parser.parse_args() script_dir = os.path.dirname(os.path.abspath(__file__)) if os.path.isfile(os.path.join(script_dir, "package.json")): root = script_dir else: root = os.path.dirname(script_dir) cfg = get_cfg() print("=" * 60) print(" soul-admin 部署(dist/dist2 无缝切换)") print("=" * 60) print(" 服务器: %s@%s 站点目录: %s" % (cfg["user"], cfg["host"], cfg["dist_path"])) print("=" * 60) if not args.no_build: print("[1/4] 本地构建 pnpm build ...") if not run_build(root): return 1 else: if not os.path.isdir(os.path.join(root, "dist")) or not os.path.isfile( os.path.join(root, "dist", "index.html") ): print("[错误] 未找到 dist/index.html,请先执行 pnpm build 或去掉 --no-build") return 1 print("[1/4] 跳过本地构建") zip_path = pack_dist_zip(root) if not zip_path: return 1 if not upload_zip_and_extract_to_dist2(cfg, zip_path): return 1 try: os.remove(zip_path) except Exception: pass if not remote_swap_dist(cfg): return 1 print("") print(" 部署完成!站点目录: %s" % cfg["dist_path"]) return 0 if __name__ == "__main__": sys.exit(main())