319 lines
12 KiB
Python
319 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
soul-admin 静态站点部署:打包 dist → 上传 → 解压到 dist2 → dist/dist2 互换实现无缝切换。
|
||
不安装依赖、不重启、不调用宝塔 API。
|
||
"""
|
||
|
||
from __future__ import print_function
|
||
|
||
import os
|
||
import sys
|
||
import shlex
|
||
import tempfile
|
||
import argparse
|
||
import zipfile
|
||
|
||
try:
|
||
import paramiko
|
||
except ImportError:
|
||
print("错误: 请先安装 paramiko")
|
||
print(" pip install paramiko")
|
||
sys.exit(1)
|
||
|
||
# ==================== 配置 ====================
|
||
|
||
# 站点根目录(Nginx 等指向的目录的上一级,即 dist 的父目录)
|
||
DEPLOY_BASE_PATH = "/www/wwwroot/self/soul-admin"
|
||
# 切换后 chown 的属主,宝塔一般为 www:www,空则跳过
|
||
DEPLOY_WWW_USER = os.environ.get("DEPLOY_WWW_USER", "www:www")
|
||
DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022"))
|
||
|
||
|
||
def get_cfg():
|
||
base = os.environ.get("DEPLOY_BASE_PATH", DEPLOY_BASE_PATH).rstrip("/")
|
||
return {
|
||
"host": os.environ.get("DEPLOY_HOST", "43.139.27.93"),
|
||
"user": os.environ.get("DEPLOY_USER", "root"),
|
||
"password": os.environ.get("DEPLOY_PASSWORD", "Zhiqun1984"),
|
||
"ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""),
|
||
"base_path": base,
|
||
"dist_path": base + "/dist",
|
||
"dist2_path": base + "/dist2",
|
||
"www_user": os.environ.get("DEPLOY_WWW_USER", DEPLOY_WWW_USER).strip(),
|
||
}
|
||
|
||
|
||
# ==================== 本地构建 ====================
|
||
|
||
|
||
def run_build(root):
|
||
"""执行本地 pnpm build"""
|
||
use_shell = sys.platform == "win32"
|
||
dist_dir = os.path.join(root, "dist")
|
||
index_html = os.path.join(dist_dir, "index.html")
|
||
|
||
try:
|
||
r = __import__("subprocess").run(
|
||
["pnpm", "build"],
|
||
cwd=root,
|
||
shell=use_shell,
|
||
timeout=300,
|
||
capture_output=True,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
)
|
||
if r.returncode != 0:
|
||
print(" [失败] 构建失败,退出码:", r.returncode)
|
||
for line in (r.stdout or "").strip().split("\n")[-10:]:
|
||
print(" " + line)
|
||
return False
|
||
except __import__("subprocess").TimeoutExpired:
|
||
print(" [失败] 构建超时")
|
||
return False
|
||
except FileNotFoundError:
|
||
print(" [失败] 未找到 pnpm,请安装: npm install -g pnpm")
|
||
return False
|
||
except Exception as e:
|
||
print(" [失败] 构建异常:", str(e))
|
||
return False
|
||
|
||
if not os.path.isfile(index_html):
|
||
print(" [失败] 未找到 dist/index.html")
|
||
return False
|
||
print(" [成功] 构建完成")
|
||
return True
|
||
|
||
|
||
# ==================== 打包 dist 为 zip ====================
|
||
|
||
|
||
def pack_dist_zip(root):
|
||
"""将本地 dist 目录打包为 zip(解压到 dist2 后即为站点根内容)"""
|
||
print("[2/4] 打包 dist 为 zip ...")
|
||
dist_dir = os.path.join(root, "dist")
|
||
if not os.path.isdir(dist_dir):
|
||
print(" [失败] 未找到 dist 目录")
|
||
return None
|
||
index_html = os.path.join(dist_dir, "index.html")
|
||
if not os.path.isfile(index_html):
|
||
print(" [失败] 未找到 dist/index.html,请先执行 pnpm build")
|
||
return None
|
||
|
||
zip_path = os.path.join(tempfile.gettempdir(), "soul_admin_deploy.zip")
|
||
try:
|
||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
||
for dirpath, _dirs, filenames in os.walk(dist_dir):
|
||
for f in filenames:
|
||
full = os.path.join(dirpath, f)
|
||
arcname = os.path.relpath(full, dist_dir).replace("\\", "/")
|
||
zf.write(full, arcname)
|
||
print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, os.path.getsize(zip_path) / 1024 / 1024))
|
||
return zip_path
|
||
except Exception as e:
|
||
print(" [失败] 打包异常:", str(e))
|
||
return None
|
||
|
||
|
||
# ==================== SSH 上传并解压到 dist2 ====================
|
||
|
||
|
||
def upload_zip_and_extract_to_dist2(cfg, zip_path):
|
||
"""上传 zip 到服务器并解压到 dist2"""
|
||
print("[3/4] SSH 上传 zip 并解压到 dist2 ...")
|
||
sys.stdout.flush()
|
||
if not cfg.get("password") and not cfg.get("ssh_key"):
|
||
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
|
||
return False
|
||
zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024)
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
try:
|
||
print(" 正在连接 %s@%s:%s ..." % (cfg["user"], cfg["host"], DEFAULT_SSH_PORT))
|
||
sys.stdout.flush()
|
||
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
|
||
client.connect(
|
||
cfg["host"],
|
||
port=DEFAULT_SSH_PORT,
|
||
username=cfg["user"],
|
||
key_filename=cfg["ssh_key"],
|
||
timeout=30,
|
||
banner_timeout=30,
|
||
)
|
||
else:
|
||
client.connect(
|
||
cfg["host"],
|
||
port=DEFAULT_SSH_PORT,
|
||
username=cfg["user"],
|
||
password=cfg["password"],
|
||
timeout=30,
|
||
banner_timeout=30,
|
||
)
|
||
print(" [OK] SSH 已连接,正在上传 zip(%.1f MB)..." % zip_size_mb)
|
||
sys.stdout.flush()
|
||
remote_zip = cfg["base_path"] + "/soul_admin_deploy.zip"
|
||
sftp = client.open_sftp()
|
||
chunk_mb = 5.0
|
||
last_reported = [0]
|
||
|
||
def _progress(transferred, total):
|
||
if total and total > 0:
|
||
now_mb = transferred / (1024 * 1024)
|
||
if now_mb - last_reported[0] >= chunk_mb or transferred >= total:
|
||
last_reported[0] = now_mb
|
||
print("\r 上传进度: %.1f / %.1f MB" % (now_mb, total / (1024 * 1024)), end="")
|
||
sys.stdout.flush()
|
||
|
||
sftp.put(zip_path, remote_zip, callback=_progress)
|
||
if zip_size_mb >= chunk_mb:
|
||
print("")
|
||
print(" [OK] zip 已上传,正在服务器解压到 dist2 ...")
|
||
sys.stdout.flush()
|
||
sftp.close()
|
||
dist2 = cfg["dist2_path"]
|
||
cmd = "rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK" % (
|
||
dist2,
|
||
dist2,
|
||
remote_zip,
|
||
dist2,
|
||
remote_zip,
|
||
)
|
||
stdin, stdout, stderr = client.exec_command(cmd, timeout=120)
|
||
out = stdout.read().decode("utf-8", errors="replace").strip()
|
||
err = stderr.read().decode("utf-8", errors="replace").strip()
|
||
if err:
|
||
print(" 服务器 stderr: %s" % err[:500])
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
if exit_status != 0 or "OK" not in out:
|
||
print(" [失败] 解压失败,退出码: %s" % exit_status)
|
||
if out:
|
||
print(" stdout: %s" % out[:300])
|
||
return False
|
||
print(" [成功] 已解压到: %s" % dist2)
|
||
return True
|
||
except Exception as e:
|
||
print(" [失败] SSH 错误: %s" % str(e))
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
finally:
|
||
client.close()
|
||
|
||
|
||
# ==================== 服务器目录切换:dist → dist1,dist2 → dist ====================
|
||
|
||
|
||
def remote_swap_dist(cfg):
|
||
"""服务器上:dist→dist1,dist2→dist,删除 dist1,实现无缝切换"""
|
||
print("[4/4] 服务器切换目录: dist→dist1, dist2→dist ...")
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
try:
|
||
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
|
||
client.connect(
|
||
cfg["host"],
|
||
port=DEFAULT_SSH_PORT,
|
||
username=cfg["user"],
|
||
key_filename=cfg["ssh_key"],
|
||
timeout=15,
|
||
)
|
||
else:
|
||
client.connect(
|
||
cfg["host"],
|
||
port=DEFAULT_SSH_PORT,
|
||
username=cfg["user"],
|
||
password=cfg["password"],
|
||
timeout=15,
|
||
)
|
||
base = cfg["base_path"]
|
||
# 若当前没有 dist(首次部署),则 dist2 直接改名为 dist;若有 dist 则先备份再替换
|
||
cmd = "cd %s && (test -d dist && (mv dist dist1 && mv dist2 dist && rm -rf dist1) || mv dist2 dist) && echo OK" % base
|
||
stdin, stdout, stderr = client.exec_command(cmd, timeout=60)
|
||
out = stdout.read().decode("utf-8", errors="replace").strip()
|
||
err = stderr.read().decode("utf-8", errors="replace").strip()
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
if exit_status != 0 or "OK" not in out:
|
||
print(" [失败] 切换失败 (退出码: %s)" % exit_status)
|
||
if err:
|
||
print(" 服务器 stderr: %s" % err)
|
||
if out and "OK" not in out:
|
||
print(" 服务器 stdout: %s" % out)
|
||
return False
|
||
print(" [成功] 新版本已切换至: %s" % cfg["dist_path"])
|
||
# 切换后设置 www 访问权限,否则 Nginx 无法读文件导致无法访问
|
||
www_user = cfg.get("www_user")
|
||
if www_user:
|
||
dist_path = cfg["dist_path"]
|
||
chown_cmd = "chown -R %s %s && echo OK" % (www_user, shlex.quote(dist_path))
|
||
stdin, stdout, stderr = client.exec_command(chown_cmd, timeout=60)
|
||
chown_out = stdout.read().decode("utf-8", errors="replace").strip()
|
||
chown_err = stderr.read().decode("utf-8", errors="replace").strip()
|
||
if stdout.channel.recv_exit_status() != 0 or "OK" not in chown_out:
|
||
print(" [警告] chown 失败,站点可能无法访问: %s" % (chown_err or chown_out))
|
||
else:
|
||
print(" [成功] 已设置属主: %s" % www_user)
|
||
return True
|
||
except Exception as e:
|
||
print(" [失败] SSH 错误: %s" % str(e))
|
||
return False
|
||
finally:
|
||
client.close()
|
||
|
||
|
||
# ==================== 主函数 ====================
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="soul-admin 静态站点部署(dist2 解压后目录切换,无缝更新)",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="不安装依赖、不重启、不调用宝塔 API。站点路径: " + DEPLOY_BASE_PATH + "/dist",
|
||
)
|
||
parser.add_argument("--no-build", action="store_true", help="跳过本地 pnpm build")
|
||
args = parser.parse_args()
|
||
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
if os.path.isfile(os.path.join(script_dir, "package.json")):
|
||
root = script_dir
|
||
else:
|
||
root = os.path.dirname(script_dir)
|
||
|
||
cfg = get_cfg()
|
||
print("=" * 60)
|
||
print(" soul-admin 部署(dist/dist2 无缝切换)")
|
||
print("=" * 60)
|
||
print(" 服务器: %s@%s 站点目录: %s" % (cfg["user"], cfg["host"], cfg["dist_path"]))
|
||
print("=" * 60)
|
||
|
||
if not args.no_build:
|
||
print("[1/4] 本地构建 pnpm build ...")
|
||
if not run_build(root):
|
||
return 1
|
||
else:
|
||
if not os.path.isdir(os.path.join(root, "dist")) or not os.path.isfile(
|
||
os.path.join(root, "dist", "index.html")
|
||
):
|
||
print("[错误] 未找到 dist/index.html,请先执行 pnpm build 或去掉 --no-build")
|
||
return 1
|
||
print("[1/4] 跳过本地构建")
|
||
|
||
zip_path = pack_dist_zip(root)
|
||
if not zip_path:
|
||
return 1
|
||
if not upload_zip_and_extract_to_dist2(cfg, zip_path):
|
||
return 1
|
||
try:
|
||
os.remove(zip_path)
|
||
except Exception:
|
||
pass
|
||
if not remote_swap_dist(cfg):
|
||
return 1
|
||
print("")
|
||
print(" 部署完成!站点目录: %s" % cfg["dist_path"])
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|