Files
soul-yongping/devlop.py
2026-02-09 15:09:29 +08:00

776 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import sys
import shutil
import tempfile
import argparse
import json
import zipfile
import tarfile
import subprocess
import time
import hashlib
try:
import paramiko
except ImportError:
print("错误: 请先安装 paramiko")
print(" pip install paramiko")
sys.exit(1)
try:
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
print("错误: 请先安装 requests")
print(" pip install requests")
sys.exit(1)
# ==================== 配置 ====================
# 端口统一从环境变量 DEPLOY_PORT 读取,未设置时使用此默认值(需与 Nginx proxy_pass、ecosystem.config.cjs 一致)
DEPLOY_PM2_APP = "soul"
DEFAULT_DEPLOY_PORT = 3006
DEPLOY_PROJECT_PATH = "/www/wwwroot/自营/soul"
DEPLOY_SITE_URL = "https://soul.quwanzhi.com"
# SSH 端口(支持环境变量 DEPLOY_SSH_PORT未设置时默认为 22022
DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022"))
def get_cfg():
"""获取基础部署配置deploy 模式与 devlop 共用 SSH/宝塔)"""
return {
"host": os.environ.get("DEPLOY_HOST", "43.139.27.93"),
"user": os.environ.get("DEPLOY_USER", "root"),
"password": os.environ.get("DEPLOY_PASSWORD", "Zhiqun1984"),
"ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""),
"project_path": os.environ.get("DEPLOY_PROJECT_PATH", DEPLOY_PROJECT_PATH),
"panel_url": os.environ.get("BAOTA_PANEL_URL", "https://43.139.27.93:9988"),
"api_key": os.environ.get("BAOTA_API_KEY", "hsAWqFSi0GOCrunhmYdkxy92tBXfqYjd"),
"pm2_name": os.environ.get("DEPLOY_PM2_APP", DEPLOY_PM2_APP),
"site_url": os.environ.get("DEPLOY_SITE_URL", DEPLOY_SITE_URL),
"port": int(os.environ.get("DEPLOY_PORT", str(DEFAULT_DEPLOY_PORT))),
"node_version": os.environ.get("DEPLOY_NODE_VERSION", "v22.14.0"),
"node_path": os.environ.get("DEPLOY_NODE_PATH", "/www/server/nodejs/v22.14.0/bin"),
}
def get_cfg_devlop():
"""devlop 模式配置:在基础配置上增加 base_path / dist / dist2。
实际运行目录为 dist_path切换后新版本在 dist宝塔 PM2 项目路径必须指向 dist_path
否则会从错误目录启动导致 .next/static 等静态资源 404。"""
cfg = get_cfg().copy()
cfg["base_path"] = os.environ.get("DEVOP_BASE_PATH", DEPLOY_PROJECT_PATH)
cfg["dist_path"] = cfg["base_path"] + "/dist"
cfg["dist2_path"] = cfg["base_path"] + "/dist2"
return cfg
# ==================== 宝塔 API ====================
def _get_sign(api_key):
now_time = int(time.time())
sign_str = str(now_time) + hashlib.md5(api_key.encode("utf-8")).hexdigest()
request_token = hashlib.md5(sign_str.encode("utf-8")).hexdigest()
return now_time, request_token
def _baota_request(panel_url, api_key, path, data=None):
req_time, req_token = _get_sign(api_key)
payload = {"request_time": req_time, "request_token": req_token}
if data:
payload.update(data)
url = panel_url.rstrip("/") + "/" + path.lstrip("/")
try:
r = requests.post(url, data=payload, verify=False, timeout=30)
return r.json() if r.text else {}
except Exception as e:
print(" API 请求失败: %s" % str(e))
return None
def get_node_project_list(panel_url, api_key):
for path in ["/project/nodejs/get_project_list", "/plugin?action=a&name=nodejs&s=get_project_list"]:
result = _baota_request(panel_url, api_key, path)
if result and (result.get("status") is True or "data" in result):
return result.get("data", [])
return None
def get_node_project_status(panel_url, api_key, pm2_name):
projects = get_node_project_list(panel_url, api_key)
if projects:
for p in projects:
if p.get("name") == pm2_name:
return p
return None
def start_node_project(panel_url, api_key, pm2_name):
for path in ["/project/nodejs/start_project", "/plugin?action=a&name=nodejs&s=start_project"]:
result = _baota_request(panel_url, api_key, path, {"project_name": pm2_name})
if result and (result.get("status") is True or result.get("msg") or "成功" in str(result)):
print(" [成功] 启动成功: %s" % pm2_name)
return True
return False
def stop_node_project(panel_url, api_key, pm2_name):
for path in ["/project/nodejs/stop_project", "/plugin?action=a&name=nodejs&s=stop_project"]:
result = _baota_request(panel_url, api_key, path, {"project_name": pm2_name})
if result and (result.get("status") is True or result.get("msg") or "成功" in str(result)):
print(" [成功] 停止成功: %s" % pm2_name)
return True
return False
def restart_node_project(panel_url, api_key, pm2_name):
project_status = get_node_project_status(panel_url, api_key, pm2_name)
if project_status:
print(" 项目状态: %s" % project_status.get("status", "未知"))
for path in ["/project/nodejs/restart_project", "/plugin?action=a&name=nodejs&s=restart_project"]:
result = _baota_request(panel_url, api_key, path, {"project_name": pm2_name})
if result and (result.get("status") is True or result.get("msg") or "成功" in str(result)):
print(" [成功] 重启成功: %s" % pm2_name)
return True
if result and "msg" in result:
print(" API 返回: %s" % result.get("msg"))
print(" [警告] 重启失败,请检查宝塔 Node 插件是否安装、API 密钥是否正确")
return False
def add_or_update_node_project(panel_url, api_key, pm2_name, project_path, port=None, node_path=None):
if port is None:
port = int(os.environ.get("DEPLOY_PORT", str(DEFAULT_DEPLOY_PORT)))
port_env = "PORT=%d " % port
run_cmd = port_env + ("%s/node server.js" % node_path if node_path else "node server.js")
payload = {"name": pm2_name, "path": project_path, "run_cmd": run_cmd, "port": str(port)}
for path in ["/project/nodejs/add_project", "/plugin?action=a&name=nodejs&s=add_project"]:
result = _baota_request(panel_url, api_key, path, payload)
if result and result.get("status") is True:
print(" [成功] 项目配置已更新: %s" % pm2_name)
return True
if result and "msg" in result:
print(" API 返回: %s" % result.get("msg"))
return False
# ==================== 本地构建 ====================
def run_build(root):
"""执行本地 pnpm build"""
use_shell = sys.platform == "win32"
standalone = os.path.join(root, ".next", "standalone")
server_js = os.path.join(standalone, "server.js")
try:
r = subprocess.run(
["pnpm", "build"],
cwd=root,
shell=use_shell,
timeout=600,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
stdout_text = r.stdout or ""
stderr_text = r.stderr or ""
combined = stdout_text + stderr_text
is_windows_symlink_error = (
sys.platform == "win32"
and r.returncode != 0
and ("EPERM" in combined or "symlink" in combined.lower() or "operation not permitted" in combined.lower() or "errno: -4048" in combined)
)
if r.returncode != 0:
if is_windows_symlink_error:
print(" [警告] Windows 符号链接权限错误EPERM")
print(" 解决方案:开启开发者模式 / 以管理员运行 / 或使用 --no-build")
if os.path.isdir(standalone) and os.path.isfile(server_js):
print(" [成功] standalone 输出可用,继续部署")
return True
return False
print(" [失败] 构建失败,退出码:", r.returncode)
for line in (stdout_text.strip().split("\n") or [])[-10:]:
print(" " + line)
return False
except subprocess.TimeoutExpired:
print(" [失败] 构建超时超过10分钟")
return False
except FileNotFoundError:
print(" [失败] 未找到 pnpm请安装: npm install -g pnpm")
return False
except Exception as e:
print(" [失败] 构建异常:", str(e))
if os.path.isdir(standalone) and os.path.isfile(server_js):
print(" [提示] 可尝试使用 --no-build 跳过构建")
return False
if not os.path.isdir(standalone) or not os.path.isfile(server_js):
print(" [失败] 未找到 .next/standalone 或 server.js")
return False
print(" [成功] 构建完成")
return True
def clean_standalone_before_build(root, retries=3, delay=2):
"""构建前删除 .next/standalone避免 Windows EBUSY"""
standalone = os.path.join(root, ".next", "standalone")
if not os.path.isdir(standalone):
return True
for attempt in range(1, retries + 1):
try:
shutil.rmtree(standalone)
print(" [清理] 已删除 .next/standalone%d 次尝试)" % attempt)
return True
except (OSError, PermissionError):
if attempt < retries:
print(" [清理] 被占用,%ds 后重试 (%d/%d) ..." % (delay, attempt, retries))
time.sleep(delay)
else:
print(" [失败] 无法删除 .next/standalone可改用 --no-build")
return False
return False
# ==================== 打包deploy 模式tar.gz ====================
def _copy_with_dereference(src, dst):
if os.path.islink(src):
link_target = os.readlink(src)
real_path = link_target if os.path.isabs(link_target) else os.path.join(os.path.dirname(src), link_target)
if os.path.exists(real_path):
if os.path.isdir(real_path):
shutil.copytree(real_path, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(real_path, dst)
else:
shutil.copy2(src, dst, follow_symlinks=False)
elif os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
def pack_standalone_tar(root):
"""打包 standalone 为 tar.gzdeploy 模式用)"""
print("[2/4] 打包 standalone ...")
standalone = os.path.join(root, ".next", "standalone")
static_src = os.path.join(root, ".next", "static")
public_src = os.path.join(root, "public")
ecosystem_src = os.path.join(root, "ecosystem.config.cjs")
if not os.path.isdir(standalone) or not os.path.isdir(static_src):
print(" [失败] 未找到 .next/standalone 或 .next/static")
return None
chunks_dir = os.path.join(static_src, "chunks")
if not os.path.isdir(chunks_dir):
print(" [失败] .next/static/chunks 不存在,请先完整执行 pnpm build本地 pnpm start 能正常打开页面后再部署)")
return None
staging = tempfile.mkdtemp(prefix="soul_deploy_")
try:
for name in os.listdir(standalone):
_copy_with_dereference(os.path.join(standalone, name), os.path.join(staging, name))
node_modules_dst = os.path.join(staging, "node_modules")
pnpm_dir = os.path.join(node_modules_dst, ".pnpm")
if os.path.isdir(pnpm_dir):
for dep in ["styled-jsx"]:
dep_in_root = os.path.join(node_modules_dst, dep)
if not os.path.exists(dep_in_root):
for pnpm_pkg in os.listdir(pnpm_dir):
if pnpm_pkg.startswith(dep + "@"):
src_dep = os.path.join(pnpm_dir, pnpm_pkg, "node_modules", dep)
if os.path.isdir(src_dep):
shutil.copytree(src_dep, dep_in_root, symlinks=False, dirs_exist_ok=True)
break
static_dst = os.path.join(staging, ".next", "static")
if os.path.exists(static_dst):
shutil.rmtree(static_dst)
os.makedirs(os.path.dirname(static_dst), exist_ok=True)
shutil.copytree(static_src, static_dst)
# 同步构建索引(与 start-standalone.js 一致),避免宝塔上 server 用错导致页面空白/404
next_root = os.path.join(root, ".next")
next_staging = os.path.join(staging, ".next")
index_files = [
"BUILD_ID",
"build-manifest.json",
"app-path-routes-manifest.json",
"routes-manifest.json",
"prerender-manifest.json",
"required-server-files.json",
"fallback-build-manifest.json",
]
for name in index_files:
src = os.path.join(next_root, name)
if os.path.isfile(src):
shutil.copy2(src, os.path.join(next_staging, name))
print(" [已同步] 构建索引: BUILD_ID, build-manifest, routes-manifest 等")
if os.path.isdir(public_src):
shutil.copytree(public_src, os.path.join(staging, "public"), dirs_exist_ok=True)
if os.path.isfile(ecosystem_src):
shutil.copy2(ecosystem_src, os.path.join(staging, "ecosystem.config.cjs"))
pkg_json = os.path.join(staging, "package.json")
if os.path.isfile(pkg_json):
try:
with open(pkg_json, "r", encoding="utf-8") as f:
data = json.load(f)
data.setdefault("scripts", {})["start"] = "node server.js"
with open(pkg_json, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
pass
tarball = os.path.join(tempfile.gettempdir(), "soul_deploy.tar.gz")
with tarfile.open(tarball, "w:gz") as tf:
for name in os.listdir(staging):
tf.add(os.path.join(staging, name), arcname=name)
print(" [成功] 打包完成: %s (%.2f MB)" % (tarball, os.path.getsize(tarball) / 1024 / 1024))
return tarball
except Exception as e:
print(" [失败] 打包异常:", str(e))
return None
finally:
shutil.rmtree(staging, ignore_errors=True)
# ==================== Node 环境检查 & SSH 上传deploy 模式) ====================
def check_node_environments(cfg):
print("[检查] Node 环境 ...")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key"):
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15)
else:
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15)
stdin, stdout, stderr = client.exec_command("which node && node -v", timeout=10)
print(" 默认 Node: %s" % (stdout.read().decode("utf-8", errors="replace").strip() or "未找到"))
node_path = cfg.get("node_path", "/www/server/nodejs/v22.14.0/bin")
stdin, stdout, stderr = client.exec_command("%s/node -v 2>/dev/null" % node_path, timeout=5)
print(" 配置 Node: %s" % (stdout.read().decode("utf-8", errors="replace").strip() or "不可用"))
return True
except Exception as e:
print(" [警告] %s" % str(e))
return False
finally:
client.close()
def upload_and_extract(cfg, tarball_path):
"""SSH 上传 tar.gz 并解压到 project_pathdeploy 模式)"""
print("[3/4] SSH 上传并解压 ...")
if not cfg.get("password") and not cfg.get("ssh_key"):
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
return False
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15)
else:
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15)
sftp = client.open_sftp()
remote_tar = "/tmp/soul_deploy.tar.gz"
remote_script = "/tmp/soul_deploy_extract.sh"
sftp.put(tarball_path, remote_tar)
node_path = cfg.get("node_path", "/www/server/nodejs/v22.14.0/bin")
project_path = cfg["project_path"]
script_content = """#!/bin/bash
export PATH=%s:$PATH
cd %s
rm -rf .next public ecosystem.config.cjs server.js package.json 2>/dev/null
tar -xzf %s
rm -f %s
echo OK
""" % (node_path, project_path, remote_tar, remote_tar)
with sftp.open(remote_script, "w") as f:
f.write(script_content)
sftp.close()
client.exec_command("chmod +x %s" % remote_script, timeout=10)
stdin, stdout, stderr = client.exec_command("bash %s" % remote_script, timeout=120)
out = stdout.read().decode("utf-8", errors="replace").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 解压失败,退出码:", exit_status)
return False
print(" [成功] 解压完成: %s" % project_path)
return True
except Exception as e:
print(" [失败] SSH 错误:", str(e))
return False
finally:
client.close()
def deploy_via_baota_api(cfg):
"""宝塔 API 重启 Node 项目deploy 模式)"""
print("[4/4] 宝塔 API 管理 Node 项目 ...")
panel_url, api_key, pm2_name = cfg["panel_url"], cfg["api_key"], cfg["pm2_name"]
project_path = cfg["project_path"]
node_path = cfg.get("node_path", "/www/server/nodejs/v22.14.0/bin")
port = cfg["port"]
if not get_node_project_status(panel_url, api_key, pm2_name):
add_or_update_node_project(panel_url, api_key, pm2_name, project_path, port, node_path)
stop_node_project(panel_url, api_key, pm2_name)
time.sleep(2)
ok = restart_node_project(panel_url, api_key, pm2_name)
if not ok:
ok = start_node_project(panel_url, api_key, pm2_name)
if not ok:
print(" 请到宝塔 Node 项目手动重启 %s,路径: %s" % (pm2_name, project_path))
return ok
# ==================== 打包devlop 模式zip ====================
ZIP_EXCLUDE_DIRS = {".cache", "__pycache__", ".git", "node_modules", "cache", "test", "tests", "coverage", ".nyc_output", ".turbo", "开发文档"}
ZIP_EXCLUDE_FILE_NAMES = {".DS_Store", "Thumbs.db"}
ZIP_EXCLUDE_FILE_SUFFIXES = (".log", ".map")
def _should_exclude_from_zip(arcname, is_file=True):
parts = arcname.replace("\\", "/").split("/")
for part in parts:
if part in ZIP_EXCLUDE_DIRS:
return True
if is_file and parts:
name = parts[-1]
if name in ZIP_EXCLUDE_FILE_NAMES or any(name.endswith(s) for s in ZIP_EXCLUDE_FILE_SUFFIXES):
return True
return False
def pack_standalone_zip(root):
"""打包 standalone 为 zipdevlop 模式用)"""
print("[2/7] 打包 standalone 为 zip ...")
standalone = os.path.join(root, ".next", "standalone")
static_src = os.path.join(root, ".next", "static")
public_src = os.path.join(root, "public")
ecosystem_src = os.path.join(root, "ecosystem.config.cjs")
if not os.path.isdir(standalone) or not os.path.isdir(static_src):
print(" [失败] 未找到 .next/standalone 或 .next/static")
return None
chunks_dir = os.path.join(static_src, "chunks")
if not os.path.isdir(chunks_dir):
print(" [失败] .next/static/chunks 不存在,请先完整执行 pnpm build本地 pnpm start 能正常打开页面后再部署)")
return None
staging = tempfile.mkdtemp(prefix="soul_devlop_")
try:
for name in os.listdir(standalone):
_copy_with_dereference(os.path.join(standalone, name), os.path.join(staging, name))
node_modules_dst = os.path.join(staging, "node_modules")
pnpm_dir = os.path.join(node_modules_dst, ".pnpm")
if os.path.isdir(pnpm_dir):
for dep in ["styled-jsx"]:
dep_in_root = os.path.join(node_modules_dst, dep)
if not os.path.exists(dep_in_root):
for pnpm_pkg in os.listdir(pnpm_dir):
if pnpm_pkg.startswith(dep + "@"):
src_dep = os.path.join(pnpm_dir, pnpm_pkg, "node_modules", dep)
if os.path.isdir(src_dep):
shutil.copytree(src_dep, dep_in_root, symlinks=False, dirs_exist_ok=True)
break
os.makedirs(os.path.join(staging, ".next"), exist_ok=True)
shutil.copytree(static_src, os.path.join(staging, ".next", "static"), dirs_exist_ok=True)
# 同步构建索引(与 start-standalone.js 一致),避免宝塔上 server 用错导致页面空白/404
next_root = os.path.join(root, ".next")
next_staging = os.path.join(staging, ".next")
index_files = [
"BUILD_ID",
"build-manifest.json",
"app-path-routes-manifest.json",
"routes-manifest.json",
"prerender-manifest.json",
"required-server-files.json",
"fallback-build-manifest.json",
]
for name in index_files:
src = os.path.join(next_root, name)
if os.path.isfile(src):
shutil.copy2(src, os.path.join(next_staging, name))
print(" [已同步] 构建索引: BUILD_ID, build-manifest, routes-manifest 等")
if os.path.isdir(public_src):
shutil.copytree(public_src, os.path.join(staging, "public"), dirs_exist_ok=True)
if os.path.isfile(ecosystem_src):
shutil.copy2(ecosystem_src, os.path.join(staging, "ecosystem.config.cjs"))
pkg_json = os.path.join(staging, "package.json")
if os.path.isfile(pkg_json):
try:
with open(pkg_json, "r", encoding="utf-8") as f:
data = json.load(f)
data.setdefault("scripts", {})["start"] = "node server.js"
with open(pkg_json, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
pass
server_js = os.path.join(staging, "server.js")
if os.path.isfile(server_js):
try:
deploy_port = int(os.environ.get("DEPLOY_PORT", str(DEFAULT_DEPLOY_PORT)))
with open(server_js, "r", encoding="utf-8") as f:
c = f.read()
if "|| 3000" in c:
with open(server_js, "w", encoding="utf-8") as f:
f.write(c.replace("|| 3000", "|| %d" % deploy_port))
except Exception:
pass
zip_path = os.path.join(tempfile.gettempdir(), "soul_devlop.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for name in os.listdir(staging):
path = os.path.join(staging, name)
if os.path.isfile(path):
if not _should_exclude_from_zip(name):
zf.write(path, name)
else:
for dirpath, dirs, filenames in os.walk(path):
dirs[:] = [d for d in dirs if not _should_exclude_from_zip(os.path.join(name, os.path.relpath(os.path.join(dirpath, d), path)), is_file=False)]
for f in filenames:
full = os.path.join(dirpath, f)
arcname = os.path.join(name, os.path.relpath(full, path))
if not _should_exclude_from_zip(arcname):
zf.write(full, arcname)
print(" [成功] 打包完成: %s (%.2f MB)" % (zip_path, os.path.getsize(zip_path) / 1024 / 1024))
return zip_path
except Exception as e:
print(" [失败] 打包异常:", str(e))
return None
finally:
shutil.rmtree(staging, ignore_errors=True)
def upload_zip_and_extract_to_dist2(cfg, zip_path):
"""上传 zip 并解压到 dist2devlop 模式)"""
print("[3/7] SSH 上传 zip 并解压到 dist2 ...")
sys.stdout.flush()
if not cfg.get("password") and not cfg.get("ssh_key"):
print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY")
return False
zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(" 正在连接 %s@%s:%s ..." % (cfg["user"], cfg["host"], DEFAULT_SSH_PORT))
sys.stdout.flush()
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=30, banner_timeout=30)
else:
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=30, banner_timeout=30)
print(" [OK] SSH 已连接,正在上传 zip%.1f MB..." % zip_size_mb)
sys.stdout.flush()
remote_zip = cfg["base_path"].rstrip("/") + "/soul_devlop.zip"
sftp = client.open_sftp()
# 上传进度:每 5MB 打印一次
chunk_mb = 5.0
last_reported = [0]
def _progress(transferred, total):
if total and total > 0:
now_mb = transferred / (1024 * 1024)
if now_mb - last_reported[0] >= chunk_mb or transferred >= total:
last_reported[0] = now_mb
print("\r 上传进度: %.1f / %.1f MB" % (now_mb, total / (1024 * 1024)), end="")
sys.stdout.flush()
sftp.put(zip_path, remote_zip, callback=_progress)
if zip_size_mb >= chunk_mb:
print("")
print(" [OK] zip 已上传,正在服务器解压(约 13 分钟)...")
sys.stdout.flush()
sftp.close()
dist2 = cfg["dist2_path"]
cmd = "rm -rf %s && mkdir -p %s && unzip -o -q %s -d %s && rm -f %s && echo OK" % (dist2, dist2, remote_zip, dist2, remote_zip)
stdin, stdout, stderr = client.exec_command(cmd, timeout=300)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
if err:
print(" 服务器 stderr: %s" % err[:500])
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or "OK" not in out:
print(" [失败] 解压失败,退出码: %s" % exit_status)
if out:
print(" stdout: %s" % out[:300])
return False
print(" [成功] 已解压到: %s" % dist2)
return True
except Exception as e:
print(" [失败] SSH 错误: %s" % str(e))
import traceback
traceback.print_exc()
return False
finally:
client.close()
def run_pnpm_install_in_dist2(cfg):
"""服务器 dist2 内执行 pnpm install阻塞等待完成后再返回改目录前必须完成"""
print("[4/7] 服务器 dist2 内执行 pnpm install等待完成后再切换目录...")
sys.stdout.flush()
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15)
else:
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15)
stdin, stdout, stderr = client.exec_command("bash -lc 'which pnpm'", timeout=10)
pnpm_path = stdout.read().decode("utf-8", errors="replace").strip()
if not pnpm_path:
return False, "未找到 pnpm请服务器安装: npm install -g pnpm"
cmd = "bash -lc 'cd %s && %s install'" % (cfg["dist2_path"], pnpm_path)
stdin, stdout, stderr = client.exec_command(cmd, timeout=300)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
if stdout.channel.recv_exit_status() != 0:
return False, "pnpm install 失败\n" + (err or out)
print(" [成功] dist2 内 pnpm install 已执行完成,可安全切换目录")
return True, None
except Exception as e:
return False, str(e)
finally:
client.close()
def remote_swap_dist_and_restart(cfg):
"""暂停 → dist→dist1, dist2→dist → 删除 dist1 → 更新 PM2 项目路径 → 重启devlop 模式)"""
print("[5/7] 宝塔 API 暂停 Node 项目 ...")
stop_node_project(cfg["panel_url"], cfg["api_key"], cfg["pm2_name"])
time.sleep(2)
print("[6/7] 服务器切换目录: dist→dist1, dist2→dist ...")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15)
else:
client.connect(cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15)
cmd = "cd %s && mv dist dist1 2>/dev/null; mv dist2 dist && rm -rf dist1 && echo OK" % cfg["base_path"]
stdin, stdout, stderr = client.exec_command(cmd, timeout=60)
out = stdout.read().decode("utf-8", errors="replace").strip()
if stdout.channel.recv_exit_status() != 0 or "OK" not in out:
print(" [失败] 切换失败")
return False
print(" [成功] 新版本位于 %s" % cfg["dist_path"])
finally:
client.close()
# 关键devlop 实际运行目录是 dist_path必须让宝塔 PM2 从该目录启动,否则会从错误目录跑导致静态资源 404
print("[7/7] 更新宝塔 Node 项目路径并重启 ...")
add_or_update_node_project(
cfg["panel_url"], cfg["api_key"], cfg["pm2_name"],
cfg["dist_path"], # 使用 dist_path不是 project_path
port=cfg["port"],
node_path=cfg.get("node_path"),
)
if not start_node_project(cfg["panel_url"], cfg["api_key"], cfg["pm2_name"]):
print(" [警告] 请到宝塔手动启动 %s,并确认项目路径为: %s" % (cfg["pm2_name"], cfg["dist_path"]))
return False
return True
# ==================== 主函数 ====================
def main():
parser = argparse.ArgumentParser(description="Soul 创业派对 - 统一部署脚本", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__)
parser.add_argument("--mode", choices=["devlop", "deploy"], default="devlop", help="devlop=dist切换(默认), deploy=直接覆盖")
parser.add_argument("--no-build", action="store_true", help="跳过本地构建")
parser.add_argument("--no-upload", action="store_true", help="仅 deploy 模式:跳过 SSH 上传")
parser.add_argument("--no-api", action="store_true", help="仅 deploy 模式:上传后不调宝塔 API")
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
# 支持 devlop.py 在项目根或 scripts/ 下:以含 package.json 的目录为 root
if os.path.isfile(os.path.join(script_dir, "package.json")):
root = script_dir
else:
root = os.path.dirname(script_dir)
if args.mode == "devlop":
cfg = get_cfg_devlop()
print("=" * 60)
print(" Soul 自动部署dist 切换)")
print("=" * 60)
print(" 服务器: %s@%s 目录: %s Node: %s" % (cfg["user"], cfg["host"], cfg["base_path"], cfg["pm2_name"]))
print("=" * 60)
if not args.no_build:
print("[1/7] 本地构建 pnpm build ...")
if sys.platform == "win32" and not clean_standalone_before_build(root):
return 1
if not run_build(root):
return 1
elif not os.path.isfile(os.path.join(root, ".next", "standalone", "server.js")):
print("[错误] 未找到 .next/standalone/server.js")
return 1
else:
print("[1/7] 跳过本地构建")
zip_path = pack_standalone_zip(root)
if not zip_path:
return 1
if not upload_zip_and_extract_to_dist2(cfg, zip_path):
return 1
try:
os.remove(zip_path)
except Exception:
pass
# 必须在 dist2 内 pnpm install 执行完成后再切换目录
ok, err = run_pnpm_install_in_dist2(cfg)
if not ok:
print(" [失败] %s" % (err or "pnpm install 失败"))
return 1
# install 已完成,再执行 dist→dist1、dist2→dist 切换
if not remote_swap_dist_and_restart(cfg):
return 1
print("")
print(" 部署完成!运行目录: %s" % cfg["dist_path"])
return 0
# deploy 模式
cfg = get_cfg()
print("=" * 60)
print(" Soul 一键部署(直接覆盖)")
print("=" * 60)
print(" 服务器: %s@%s 项目路径: %s PM2: %s" % (cfg["user"], cfg["host"], cfg["project_path"], cfg["pm2_name"]))
print("=" * 60)
if not args.no_upload:
check_node_environments(cfg)
if not args.no_build:
print("[1/4] 本地构建 ...")
if not run_build(root):
return 1
elif not os.path.isfile(os.path.join(root, ".next", "standalone", "server.js")):
print("[错误] 未找到 .next/standalone/server.js")
return 1
else:
print("[1/4] 跳过本地构建")
tarball = pack_standalone_tar(root)
if not tarball:
return 1
if not args.no_upload:
if not upload_and_extract(cfg, tarball):
return 1
try:
os.remove(tarball)
except Exception:
pass
else:
print(" 压缩包: %s" % tarball)
if not args.no_api and not args.no_upload:
deploy_via_baota_api(cfg)
print("")
print(" 部署完成!站点: %s" % cfg["site_url"])
return 0
if __name__ == "__main__":
sys.exit(main())