#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Soul 创业派对 - 宝塔一键部署(跨平台) 一键执行: python scripts/deploy_baota.py 依赖: pip install paramiko 流程:本地 pnpm build -> 打包 .next/standalone -> 上传 -> 服务器解压 -> PM2 运行 node server.js (不从 git 拉取,不在服务器安装依赖或构建。) """ from __future__ import print_function import os import sys import getpass import shutil import subprocess import tarfile import tempfile import threading from pathlib import Path if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') def log(msg, step=None): """输出并立即刷新,便于看到进度""" if step is not None: print('[步骤 %s] %s' % (step, msg)) else: print(msg) sys.stdout.flush() sys.stderr.flush() def log_err(msg): print('>>> 错误: %s' % msg, file=sys.stderr) sys.stderr.flush() try: import paramiko except ImportError: log('请先安装: pip install paramiko') sys.exit(1) # 默认配置(与 开发文档/服务器管理 一致) # 应用端口须与 端口配置表 及 Nginx proxy_pass 一致(soul -> 3006) CFG = { 'host': os.environ.get('DEPLOY_HOST', '42.194.232.22'), 'port': int(os.environ.get('DEPLOY_PORT', '22')), 'app_port': int(os.environ.get('DEPLOY_APP_PORT', '3006')), 'user': os.environ.get('DEPLOY_USER', 'root'), 'pwd': os.environ.get('DEPLOY_PASSWORD', 'Zhiqun1984'), 'path': os.environ.get('DEPLOY_PROJECT_PATH', '/www/wwwroot/soul'), 'branch': os.environ.get('DEPLOY_BRANCH', 'soul-content'), 'pm2': os.environ.get('DEPLOY_PM2_APP', 'soul'), 'url': os.environ.get('DEPLOY_SITE_URL', 'https://soul.quwanzhi.com'), 'key': os.environ.get('DEPLOY_SSH_KEY') or None, } EXCLUDE = { 'node_modules', '.next', '.git', '.gitignore', '.cursorrules', 'scripts', 'miniprogram', '开发文档', 'addons', 'book', '__pycache__', '.DS_Store', '*.log', 'deploy_config.json', 'requirements-deploy.txt', '*.bat', '*.ps1', } def run(ssh, cmd, desc, step_label=None, ignore_err=False): """执行远程命令,打印完整输出,失败时明确标出错误和退出码""" if step_label: log(desc, step_label) else: log(desc) print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd)) sys.stdout.flush() stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) out = stdout.read().decode('utf-8', errors='replace') err = stderr.read().decode('utf-8', errors='replace') code = stdout.channel.recv_exit_status() if out: print(out) sys.stdout.flush() if err: print(err, file=sys.stderr) sys.stderr.flush() if code != 0: log_err('退出码: %s | %s' % (code, desc)) if err and len(err.strip()) > 0: for line in err.strip().split('\n')[-5:]: print(' stderr: %s' % line, file=sys.stderr) sys.stderr.flush() return ignore_err return True def _read_and_print(stream, prefix=' ', is_stderr=False): """后台线程:不断读 stream 并打印,用于实时输出""" import threading out = sys.stderr if is_stderr else sys.stdout try: while True: line = stream.readline() if not line: break s = line.decode('utf-8', errors='replace').rstrip() if s: print('%s%s' % (prefix, s), file=out) out.flush() except Exception: pass def run_stream(ssh, cmd, desc, step_label=None, ignore_err=False): """执行远程命令并实时输出(npm install / build 不卡住、能看到进度)""" if step_label: log(desc, step_label) else: log(desc) print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd)) sys.stdout.flush() stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) t1 = threading.Thread(target=_read_and_print, args=(stdout, ' ', False)) t2 = threading.Thread(target=_read_and_print, args=(stderr, ' [stderr] ', True)) t1.daemon = True t2.daemon = True t1.start() t2.start() t1.join() t2.join() code = stdout.channel.recv_exit_status() if code != 0: log_err('退出码: %s | %s' % (code, desc)) return ignore_err return True def _tar_filter(ti): n = ti.name.replace('\\', '/') if 'node_modules' in n or '.next' in n or '.git' in n: return None if '/scripts/' in n or n.startswith('scripts/'): return None if '/miniprogram/' in n or n.startswith('miniprogram/'): return None if '/开发文档/' in n or '开发文档/' in n: return None if '/addons/' in n or '/book/' in n: return None return ti def make_tarball(root_dir): root = Path(root_dir).resolve() tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) tmp.close() with tarfile.open(tmp.name, 'w:gz') as tar: for item in root.iterdir(): name = item.name if name in EXCLUDE or name.endswith('.md') or (name.startswith('.') and name != '.cursorrules'): continue if name.startswith('deploy_config') or name.endswith('.bat') or name.endswith('.ps1'): continue arcname = name tar.add(str(item), arcname=arcname, filter=_tar_filter) return tmp.name def run_local_build(local_root, step_label=None): """本地执行 pnpm build,实时输出""" root = Path(local_root).resolve() if step_label: log('本地构建 pnpm build(standalone)', step_label) else: log('本地构建 pnpm build(standalone)') cmd_str = 'pnpm build' print(' $ %s' % cmd_str) sys.stdout.flush() try: # Windows 下用 shell=True,否则子进程 PATH 里可能没有 pnpm use_shell = sys.platform == 'win32' p = subprocess.Popen( cmd_str if use_shell else ['pnpm', 'build'], cwd=str(root), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, encoding='utf-8', errors='replace', shell=use_shell, ) for line in p.stdout: print(' %s' % line.rstrip()) sys.stdout.flush() code = p.wait() if code != 0: log_err('本地构建失败,退出码 %s' % code) return False return True except Exception as e: log_err('本地构建异常: %s' % e) return False def make_standalone_tarball(local_root): """ 在 next.config 已设置 output: 'standalone' 且已执行 pnpm build 的前提下, 将 .next/static 和 public 复制进 .next/standalone,再打包 .next/standalone 目录内容。 返回生成的 tar.gz 路径。 """ root = Path(local_root).resolve() standalone_dir = root / '.next' / 'standalone' static_src = root / '.next' / 'static' public_src = root / 'public' if not standalone_dir.is_dir(): raise FileNotFoundError('.next/standalone 不存在,请先执行 pnpm build') # Next 要求将 .next/static 和 public 复制进 standalone standalone_next = standalone_dir / '.next' standalone_next.mkdir(parents=True, exist_ok=True) if static_src.is_dir(): dest_static = standalone_next / 'static' if dest_static.exists(): shutil.rmtree(dest_static) shutil.copytree(static_src, dest_static) if public_src.is_dir(): dest_public = standalone_dir / 'public' if dest_public.exists(): shutil.rmtree(dest_public) shutil.copytree(public_src, dest_public) # 复制 PM2 配置到 standalone,便于服务器上用 pm2 start ecosystem.config.cjs ecosystem_src = root / 'ecosystem.config.cjs' if ecosystem_src.is_file(): shutil.copy2(ecosystem_src, standalone_dir / 'ecosystem.config.cjs') # 打包 standalone 目录「内容」,使解压到服务器项目目录后根目录即为 server.js tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) tmp.close() with tarfile.open(tmp.name, 'w:gz') as tar: for item in standalone_dir.iterdir(): arcname = item.name tar.add(str(item), arcname=arcname, recursive=True) return tmp.name def deploy_by_upload_standalone(ssh, sftp, local_root, remote_path, pm2_name, step_start, app_port=None): """本地 standalone 构建 -> 打包 -> 上传 -> 解压 -> PM2 用 node server.js 启动(PORT 与 Nginx 一致)""" step = step_start root = Path(local_root).resolve() # 步骤 1: 本地构建 log('本地执行 pnpm build(standalone)', step) step += 1 if not run_local_build(str(root), step_label=None): return False sys.stdout.flush() # 步骤 2: 打包 standalone log('打包 .next/standalone(含 static、public)', step) step += 1 try: tarball = make_standalone_tarball(str(root)) size_mb = os.path.getsize(tarball) / 1024 / 1024 log('打包完成,约 %.2f MB' % size_mb) except FileNotFoundError as e: log_err(str(e)) return False except Exception as e: log_err('打包失败: %s' % e) return False sys.stdout.flush() # 步骤 3: 上传 log('上传到服务器 /tmp/soul_standalone.tar.gz', step) step += 1 remote_tar = '/tmp/soul_standalone.tar.gz' try: sftp.put(tarball, remote_tar) log('上传完成') except Exception as e: log_err('上传失败: %s' % e) os.unlink(tarball) return False os.unlink(tarball) sys.stdout.flush() # 步骤 4: 清理并解压(保留 .env 等隐藏配置) log('清理旧文件并解压 standalone', step) step += 1 run(ssh, 'cd %s && rm -rf app components lib public styles .next *.json *.js *.ts *.mjs *.css *.d.ts server.js node_modules 2>/dev/null; ls -la' % remote_path, '清理', step_label=None, ignore_err=True) if not run(ssh, 'cd %s && tar -xzf %s' % (remote_path, remote_tar), '解压'): log_err('解压失败,请检查服务器磁盘或路径') return False run(ssh, 'rm -f %s' % remote_tar, '删除临时包', ignore_err=True) sys.stdout.flush() # 步骤 5: PM2 用 node server.js 启动,PORT 须与 Nginx proxy_pass 一致(默认 3006) # 宝塔服务器上 pm2 可能不在默认 PATH,先注入常见路径 port = app_port if app_port is not None else 3006 log('PM2 启动 node server.js(PORT=%s)' % port, step) pm2_cmd = ( 'export PATH=/www/server/nodejs/v22.14.0/bin:/www/server/nvm/versions/node/*/bin:$PATH 2>/dev/null; ' 'cd %s && (pm2 delete %s 2>/dev/null; PORT=%s pm2 start server.js --name %s)' ) % (remote_path, pm2_name, port, pm2_name) run(ssh, pm2_cmd, 'PM2 启动', ignore_err=True) return True def main(): print('=' * 60) print(' Soul 创业派对 - 宝塔一键部署') print('=' * 60) print(' %s@%s -> %s' % (CFG['user'], CFG['host'], CFG['path'])) print('=' * 60) sys.stdout.flush() # 步骤 1: 连接 log('连接服务器 %s:%s' % (CFG['host'], CFG['port']), '1/6') password = CFG.get('pwd') if not CFG['key'] and not password: password = getpass.getpass('请输入 SSH 密码: ') sys.stdout.flush() ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: kw = {'hostname': CFG['host'], 'port': CFG['port'], 'username': CFG['user']} if CFG['key']: kw['key_filename'] = CFG['key'] else: kw['password'] = password ssh.connect(**kw) log('连接成功') except Exception as e: log_err('连接失败: %s' % e) return 1 sys.stdout.flush() p, pm = CFG['path'], CFG['pm2'] sftp = ssh.open_sftp() # 步骤 2~6: 本地 build -> 打包 -> 上传 -> 解压 -> PM2 启动 log('本地打包上传部署(不从 git 拉取)', '2/6') local_root = Path(__file__).resolve().parent.parent if not deploy_by_upload_standalone(ssh, sftp, str(local_root), p, pm, step_start=2, app_port=CFG.get('app_port')): sftp.close() ssh.close() log_err('部署中断,请根据上方错误信息排查') return 1 sftp.close() ssh.close() print('') print('=' * 60) print(' 部署完成') print(' 前台: %s' % CFG['url']) print(' 后台: %s/admin' % CFG['url']) print('=' * 60) sys.stdout.flush() return 0 if __name__ == '__main__': sys.exit(main())