#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ soul-api Go 项目一键部署到宝塔 - 本地交叉编译 Linux 二进制 - 上传到 /www/wwwroot/自营/soul-api - 重启服务(nohup 或跳过) """ from __future__ import print_function import os import sys import tempfile import argparse import subprocess import shutil import tarfile try: import paramiko except ImportError: print("错误: 请先安装 paramiko") print(" pip install paramiko") sys.exit(1) # ==================== 配置 ==================== DEPLOY_PROJECT_PATH = "/www/wwwroot/自营/soul-api" DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022")) def get_cfg(): return { "host": os.environ.get("DEPLOY_HOST", "43.139.27.93"), "user": os.environ.get("DEPLOY_USER", "root"), "password": os.environ.get("DEPLOY_PASSWORD", "Zhiqun1984"), "ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""), "project_path": os.environ.get("DEPLOY_PROJECT_PATH", DEPLOY_PROJECT_PATH), } # ==================== 本地构建 ==================== def run_build(root): """交叉编译 Go 二进制(Linux amd64)""" print("[1/4] 本地交叉编译 Go 二进制 ...") env = os.environ.copy() env["GOOS"] = "linux" env["GOARCH"] = "amd64" env["CGO_ENABLED"] = "0" try: r = subprocess.run( ["go", "build", "-o", "soul-api", "./cmd/server"], cwd=root, env=env, shell=(sys.platform == "win32"), timeout=120, capture_output=True, text=True, encoding="utf-8", errors="replace", ) if r.returncode != 0: print(" [失败] go build 失败,退出码:", r.returncode) if r.stderr: for line in (r.stderr or "").strip().split("\n")[-10:]: print(" " + line) return None out_path = os.path.join(root, "soul-api") if not os.path.isfile(out_path): print(" [失败] 未找到编译产物 soul-api") return None print(" [成功] 编译完成: %s (%.2f MB)" % (out_path, os.path.getsize(out_path) / 1024 / 1024)) return out_path except subprocess.TimeoutExpired: print(" [失败] 编译超时") return None except FileNotFoundError: print(" [失败] 未找到 go 命令,请安装 Go") return None except Exception as e: print(" [失败] 编译异常:", str(e)) return None # ==================== 打包 ==================== def pack_deploy(root, binary_path, include_env=True): """打包二进制和 .env 为 tar.gz""" print("[2/4] 打包部署文件 ...") staging = tempfile.mkdtemp(prefix="soul_api_deploy_") try: shutil.copy2(binary_path, os.path.join(staging, "soul-api")) env_src = os.path.join(root, ".env") if include_env and os.path.isfile(env_src): shutil.copy2(env_src, os.path.join(staging, ".env")) print(" [已包含] .env") else: env_example = os.path.join(root, ".env.example") if os.path.isfile(env_example): shutil.copy2(env_example, os.path.join(staging, ".env")) print(" [已包含] .env.example -> .env (请服务器上检查配置)") tarball = os.path.join(tempfile.gettempdir(), "soul_api_deploy.tar.gz") with tarfile.open(tarball, "w:gz") as tf: for name in os.listdir(staging): tf.add(os.path.join(staging, name), arcname=name) print(" [成功] 打包完成: %s (%.2f MB)" % (tarball, os.path.getsize(tarball) / 1024 / 1024)) return tarball except Exception as e: print(" [失败] 打包异常:", str(e)) return None finally: shutil.rmtree(staging, ignore_errors=True) # ==================== SSH 上传 ==================== def upload_and_extract(cfg, tarball_path, no_restart=False): """上传 tar.gz 到服务器并解压、重启""" print("[3/4] SSH 上传并解压 ...") if not cfg.get("password") and not cfg.get("ssh_key"): print(" [失败] 请设置 DEPLOY_PASSWORD 或 DEPLOY_SSH_KEY") return False client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]): client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15, ) else: client.connect( cfg["host"], port=DEFAULT_SSH_PORT, username=cfg["user"], password=cfg["password"], timeout=15, ) sftp = client.open_sftp() remote_tar = "/tmp/soul_api_deploy.tar.gz" project_path = cfg["project_path"] sftp.put(tarball_path, remote_tar) sftp.close() cmd = ( "mkdir -p %s && cd %s && tar -xzf %s && " "chmod +x soul-api && rm -f %s && echo OK" ) % (project_path, project_path, remote_tar, remote_tar) stdin, stdout, stderr = client.exec_command(cmd, timeout=60) out = stdout.read().decode("utf-8", errors="replace").strip() exit_status = stdout.channel.recv_exit_status() if exit_status != 0 or "OK" not in out: print(" [失败] 解压失败,退出码:", exit_status) return False print(" [成功] 已解压到: %s" % project_path) if not no_restart: print("[4/4] 重启 soul-api 服务 ...") restart_cmd = ( "cd %s && pkill -f 'soul-api' 2>/dev/null; sleep 2; " "nohup ./soul-api >> soul-api.log 2>&1 & sleep 1; " "pgrep -f soul-api >/dev/null && echo RESTART_OK || echo RESTART_FAIL" ) % project_path stdin, stdout, stderr = client.exec_command(restart_cmd, timeout=15) out = stdout.read().decode("utf-8", errors="replace").strip() if "RESTART_OK" in out: print(" [成功] soul-api 已重启") else: print(" [警告] 重启状态未知,请手动检查: cd %s && ./soul-api" % project_path) else: print("[4/4] 跳过重启 (--no-restart)") return True except Exception as e: print(" [失败] SSH 错误:", str(e)) return False finally: client.close() # ==================== 主函数 ==================== def main(): parser = argparse.ArgumentParser( description="soul-api Go 项目一键部署到宝塔", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--no-build", action="store_true", help="跳过本地编译(使用已有 soul-api 二进制)") parser.add_argument("--no-env", action="store_true", help="不打包 .env(保留服务器现有 .env)") parser.add_argument("--no-restart", action="store_true", help="上传后不重启服务") args = parser.parse_args() script_dir = os.path.dirname(os.path.abspath(__file__)) root = script_dir cfg = get_cfg() print("=" * 60) print(" soul-api 一键部署到宝塔") print("=" * 60) print(" 服务器: %s@%s:%s" % (cfg["user"], cfg["host"], DEFAULT_SSH_PORT)) print(" 目标目录: %s" % cfg["project_path"]) print("=" * 60) binary_path = os.path.join(root, "soul-api") if not args.no_build: p = run_build(root) if not p: return 1 else: if not os.path.isfile(binary_path): print("[错误] 未找到 soul-api 二进制,请先编译或去掉 --no-build") return 1 print("[1/4] 跳过编译,使用现有 soul-api") tarball = pack_deploy(root, binary_path, include_env=not args.no_env) if not tarball: return 1 if not upload_and_extract(cfg, tarball, no_restart=args.no_restart): return 1 try: os.remove(tarball) except Exception: pass print("") print(" 部署完成!目录: %s" % cfg["project_path"]) return 0 if __name__ == "__main__": sys.exit(main())