Files
soul-yongping/soul-admin/deploy_admin.py

319 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
soul-admin 静态站点部署:打包 dist → 上传 → 解压到 dist2 → dist/dist2 互换实现无缝切换。
不安装依赖、不重启、不调用宝塔 API。
"""
from __future__ import print_function
import os
import sys
import shlex
import tempfile
import argparse
import zipfile
try:
import paramiko
except ImportError:
print("错误: 请先安装 paramiko")
print(" pip install paramiko")
sys.exit(1)
# ==================== 配置 ====================
# 站点根目录Nginx 等指向的目录的上一级,即 dist 的父目录)
DEPLOY_BASE_PATH = "/www/wwwroot/self/soul-admin"
# 切换后 chown 的属主,宝塔一般为 www:www空则跳过
DEPLOY_WWW_USER = os.environ.get("DEPLOY_WWW_USER", "www:www")
DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022"))
def get_cfg():
base = os.environ.get("DEPLOY_BASE_PATH", DEPLOY_BASE_PATH).rstrip("/")
return {
"host": os.environ.get("DEPLOY_HOST", "43.139.27.93"),
"user": os.environ.get("DEPLOY_USER", "root"),
"password": os.environ.get("DEPLOY_PASSWORD", "Zhiqun1984"),
"ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""),
"base_path": base,
"dist_path": base + "/dist",
"dist2_path": base + "/dist2",
"www_user": os.environ.get("DEPLOY_WWW_USER", DEPLOY_WWW_USER).strip(),
}
# ==================== 本地构建 ====================
def run_build(root):
"""执行本地 pnpm build"""
use_shell = sys.platform == "win32"
dist_dir = os.path.join(root, "dist")
index_html = os.path.join(dist_dir, "index.html")
try:
r = __import__("subprocess").run(
["pnpm", "build"],
cwd=root,
shell=use_shell,
timeout=300,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
if r.returncode != 0:
print(" [失败] 构建失败,退出码:", r.returncode)
for line in (r.stdout or "").strip().split("\n")[-10:]:
print(" " + line)
return False
except __import__("subprocess").TimeoutExpired:
print(" [失败] 构建超时")
return False
except FileNotFoundError:
print(" [失败] 未找到 pnpm请安装: npm install -g pnpm")
return False
except Exception as e:
print(" [失败] 构建异常:", str(e))
return False
if not os.path.isfile(index_html):
print(" [失败] 未找到 dist/index.html")
return False
print(" [成功] 构建完成")
return True
# ==================== 打包 dist 为 zip ====================
def pack_dist_zip(root):
"""将本地 dist 目录打包为 zip解压到 dist2 后即为站点根内容)"""
print("[2/4] 打包 dist 为 zip ...")
dist_dir = os.path.join(root, "dist")
if not os.path.isdir(dist_dir):
print(" [失败] 未找到 dist 目录")
return None
index_html = os.path.join(dist_dir, "index.html")
if not os.path.isfile(index_html):
print(" [失败] 未找到 dist/index.html请先执行 pnpm build")
return None
zip_path = os.path.join(tempfile.gettempdir(), "soul_admin_deploy.zip")
try:
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for dirpath, _dirs, filenames in os.walk(dist_dir):
for f in filenames:
full = os.path.join(dirpath, f)
arcname = os.path.relpath(full, dist_dir).replace("\\", "/")
zf.write(full, arcname)
print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, os.path.getsize(zip_path) / 1024 / 1024))
return zip_path
except Exception as e:
print(" [失败] 打包异常:", str(e))
return None
# ==================== SSH 上传并解压到 dist2 ====================
def upload_zip_and_extract_to_dist2(cfg, zip_path):
"""上传 zip 到服务器并解压到 dist2"""
print("[3/4] SSH 上传 zip 并解压到 dist2 ...")
sys.stdout.flush()
if not cfg.get("password") and not cfg.get("ssh_key"):
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
return False
zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(" 正在连接 %s@%s:%s ..." % (cfg["user"], cfg["host"], DEFAULT_SSH_PORT))
sys.stdout.flush()
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(
cfg["host"],
port=DEFAULT_SSH_PORT,
username=cfg["user"],
key_filename=cfg["ssh_key"],
timeout=30,
banner_timeout=30,
)
else:
client.connect(
cfg["host"],
port=DEFAULT_SSH_PORT,
username=cfg["user"],
password=cfg["password"],
timeout=30,
banner_timeout=30,
)
print(" [OK] SSH 已连接,正在上传 zip%.1f MB..." % zip_size_mb)
sys.stdout.flush()
remote_zip = cfg["base_path"] + "/soul_admin_deploy.zip"
sftp = client.open_sftp()
chunk_mb = 5.0
last_reported = [0]
def _progress(transferred, total):
if total and total > 0:
now_mb = transferred / (1024 * 1024)
if now_mb - last_reported[0] >= chunk_mb or transferred >= total:
last_reported[0] = now_mb
print("\r 上传进度: %.1f / %.1f MB" % (now_mb, total / (1024 * 1024)), end="")
sys.stdout.flush()
sftp.put(zip_path, remote_zip, callback=_progress)
if zip_size_mb >= chunk_mb:
print("")
print(" [OK] zip 已上传,正在服务器解压到 dist2 ...")
sys.stdout.flush()
sftp.close()
dist2 = cfg["dist2_path"]
cmd = "rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK" % (
dist2,
dist2,
remote_zip,
dist2,
remote_zip,
)
stdin, stdout, stderr = client.exec_command(cmd, timeout=120)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
if err:
print(" 服务器 stderr: %s" % err[:500])
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 解压失败,退出码: %s" % exit_status)
if out:
print(" stdout: %s" % out[:300])
return False
print(" [成功] 已解压到: %s" % dist2)
return True
except Exception as e:
print(" [失败] SSH 错误: %s" % str(e))
import traceback
traceback.print_exc()
return False
finally:
client.close()
# ==================== 服务器目录切换dist → dist1dist2 → dist ====================
def remote_swap_dist(cfg):
"""服务器上dist→dist1dist2→dist删除 dist1实现无缝切换"""
print("[4/4] 服务器切换目录: dist→dist1, dist2→dist ...")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(
cfg["host"],
port=DEFAULT_SSH_PORT,
username=cfg["user"],
key_filename=cfg["ssh_key"],
timeout=15,
)
else:
client.connect(
cfg["host"],
port=DEFAULT_SSH_PORT,
username=cfg["user"],
password=cfg["password"],
timeout=15,
)
base = cfg["base_path"]
# 若当前没有 dist首次部署则 dist2 直接改名为 dist若有 dist 则先备份再替换
cmd = "cd %s && (test -d dist && (mv dist dist1 && mv dist2 dist && rm -rf dist1) || mv dist2 dist) && echo OK" % base
stdin, stdout, stderr = client.exec_command(cmd, timeout=60)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 切换失败 (退出码: %s)" % exit_status)
if err:
print(" 服务器 stderr: %s" % err)
if out and "OK" not in out:
print(" 服务器 stdout: %s" % out)
return False
print(" [成功] 新版本已切换至: %s" % cfg["dist_path"])
# 切换后设置 www 访问权限,否则 Nginx 无法读文件导致无法访问
www_user = cfg.get("www_user")
if www_user:
dist_path = cfg["dist_path"]
chown_cmd = "chown -R %s %s && echo OK" % (www_user, shlex.quote(dist_path))
stdin, stdout, stderr = client.exec_command(chown_cmd, timeout=60)
chown_out = stdout.read().decode("utf-8", errors="replace").strip()
chown_err = stderr.read().decode("utf-8", errors="replace").strip()
if stdout.channel.recv_exit_status() != 0 or "OK" not in chown_out:
print(" [警告] chown 失败,站点可能无法访问: %s" % (chown_err or chown_out))
else:
print(" [成功] 已设置属主: %s" % www_user)
return True
except Exception as e:
print(" [失败] SSH 错误: %s" % str(e))
return False
finally:
client.close()
# ==================== 主函数 ====================
def main():
parser = argparse.ArgumentParser(
description="soul-admin 静态站点部署dist2 解压后目录切换,无缝更新)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="不安装依赖、不重启、不调用宝塔 API。站点路径: " + DEPLOY_BASE_PATH + "/dist",
)
parser.add_argument("--no-build", action="store_true", help="跳过本地 pnpm build")
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
if os.path.isfile(os.path.join(script_dir, "package.json")):
root = script_dir
else:
root = os.path.dirname(script_dir)
cfg = get_cfg()
print("=" * 60)
print(" soul-admin 部署dist/dist2 无缝切换)")
print("=" * 60)
print(" 服务器: %s@%s 站点目录: %s" % (cfg["user"], cfg["host"], cfg["dist_path"]))
print("=" * 60)
if not args.no_build:
print("[1/4] 本地构建 pnpm build ...")
if not run_build(root):
return 1
else:
if not os.path.isdir(os.path.join(root, "dist")) or not os.path.isfile(
os.path.join(root, "dist", "index.html")
):
print("[错误] 未找到 dist/index.html请先执行 pnpm build 或去掉 --no-build")
return 1
print("[1/4] 跳过本地构建")
zip_path = pack_dist_zip(root)
if not zip_path:
return 1
if not upload_zip_and_extract_to_dist2(cfg, zip_path):
return 1
try:
os.remove(zip_path)
except Exception:
pass
if not remote_swap_dist(cfg):
return 1
print("")
print(" 部署完成!站点目录: %s" % cfg["dist_path"])
return 0
if __name__ == "__main__":
sys.exit(main())