Files
soul/scripts/deploy_baota.py

371 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Soul 创业派对 - 宝塔一键部署跨平台
一键执行: python scripts/deploy_baota.py
依赖: pip install paramiko
流程本地 pnpm build -> 打包 .next/standalone -> 上传 -> 服务器解压 -> PM2 运行 node server.js
不从 git 拉取不在服务器安装依赖或构建
"""
from __future__ import print_function
import os
import sys
import getpass
import shutil
import subprocess
import tarfile
import tempfile
import threading
from pathlib import Path
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
def log(msg, step=None):
"""输出并立即刷新,便于看到进度"""
if step is not None:
print('[步骤 %s] %s' % (step, msg))
else:
print(msg)
sys.stdout.flush()
sys.stderr.flush()
def log_err(msg):
print('>>> 错误: %s' % msg, file=sys.stderr)
sys.stderr.flush()
try:
import paramiko
except ImportError:
log('请先安装: pip install paramiko')
sys.exit(1)
# 默认配置(与 开发文档/服务器管理 一致)
# 应用端口须与 端口配置表 及 Nginx proxy_pass 一致soul -> 3006
CFG = {
'host': os.environ.get('DEPLOY_HOST', '42.194.232.22'),
'port': int(os.environ.get('DEPLOY_PORT', '22')),
'app_port': int(os.environ.get('DEPLOY_APP_PORT', '3006')),
'user': os.environ.get('DEPLOY_USER', 'root'),
'pwd': os.environ.get('DEPLOY_PASSWORD', 'Zhiqun1984'),
'path': os.environ.get('DEPLOY_PROJECT_PATH', '/www/wwwroot/soul'),
'branch': os.environ.get('DEPLOY_BRANCH', 'soul-content'),
'pm2': os.environ.get('DEPLOY_PM2_APP', 'soul'),
'url': os.environ.get('DEPLOY_SITE_URL', 'https://soul.quwanzhi.com'),
'key': os.environ.get('DEPLOY_SSH_KEY') or None,
}
EXCLUDE = {
'node_modules', '.next', '.git', '.gitignore', '.cursorrules',
'scripts', 'miniprogram', '开发文档', 'addons', 'book',
'__pycache__', '.DS_Store', '*.log', 'deploy_config.json',
'requirements-deploy.txt', '*.bat', '*.ps1',
}
def run(ssh, cmd, desc, step_label=None, ignore_err=False):
"""执行远程命令,打印完整输出,失败时明确标出错误和退出码"""
if step_label:
log(desc, step_label)
else:
log(desc)
print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd))
sys.stdout.flush()
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
code = stdout.channel.recv_exit_status()
if out:
print(out)
sys.stdout.flush()
if err:
print(err, file=sys.stderr)
sys.stderr.flush()
if code != 0:
log_err('退出码: %s | %s' % (code, desc))
if err and len(err.strip()) > 0:
for line in err.strip().split('\n')[-5:]:
print(' stderr: %s' % line, file=sys.stderr)
sys.stderr.flush()
return ignore_err
return True
def _read_and_print(stream, prefix=' ', is_stderr=False):
"""后台线程:不断读 stream 并打印,用于实时输出"""
import threading
out = sys.stderr if is_stderr else sys.stdout
try:
while True:
line = stream.readline()
if not line:
break
s = line.decode('utf-8', errors='replace').rstrip()
if s:
print('%s%s' % (prefix, s), file=out)
out.flush()
except Exception:
pass
def run_stream(ssh, cmd, desc, step_label=None, ignore_err=False):
"""执行远程命令并实时输出npm install / build 不卡住、能看到进度)"""
if step_label:
log(desc, step_label)
else:
log(desc)
print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd))
sys.stdout.flush()
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
t1 = threading.Thread(target=_read_and_print, args=(stdout, ' ', False))
t2 = threading.Thread(target=_read_and_print, args=(stderr, ' [stderr] ', True))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()
t1.join()
t2.join()
code = stdout.channel.recv_exit_status()
if code != 0:
log_err('退出码: %s | %s' % (code, desc))
return ignore_err
return True
def _tar_filter(ti):
n = ti.name.replace('\\', '/')
if 'node_modules' in n or '.next' in n or '.git' in n:
return None
if '/scripts/' in n or n.startswith('scripts/'):
return None
if '/miniprogram/' in n or n.startswith('miniprogram/'):
return None
if '/开发文档/' in n or '开发文档/' in n:
return None
if '/addons/' in n or '/book/' in n:
return None
return ti
def make_tarball(root_dir):
root = Path(root_dir).resolve()
tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False)
tmp.close()
with tarfile.open(tmp.name, 'w:gz') as tar:
for item in root.iterdir():
name = item.name
if name in EXCLUDE or name.endswith('.md') or (name.startswith('.') and name != '.cursorrules'):
continue
if name.startswith('deploy_config') or name.endswith('.bat') or name.endswith('.ps1'):
continue
arcname = name
tar.add(str(item), arcname=arcname, filter=_tar_filter)
return tmp.name
def run_local_build(local_root, step_label=None):
"""本地执行 pnpm build实时输出"""
root = Path(local_root).resolve()
if step_label:
log('本地构建 pnpm buildstandalone', step_label)
else:
log('本地构建 pnpm buildstandalone')
cmd_str = 'pnpm build'
print(' $ %s' % cmd_str)
sys.stdout.flush()
try:
# Windows 下用 shell=True否则子进程 PATH 里可能没有 pnpm
use_shell = sys.platform == 'win32'
p = subprocess.Popen(
cmd_str if use_shell else ['pnpm', 'build'],
cwd=str(root),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
encoding='utf-8',
errors='replace',
shell=use_shell,
)
for line in p.stdout:
print(' %s' % line.rstrip())
sys.stdout.flush()
code = p.wait()
if code != 0:
log_err('本地构建失败,退出码 %s' % code)
return False
return True
except Exception as e:
log_err('本地构建异常: %s' % e)
return False
def make_standalone_tarball(local_root):
"""
next.config 已设置 output: 'standalone' 且已执行 pnpm build 的前提下
.next/static public 复制进 .next/standalone再打包 .next/standalone 目录内容
返回生成的 tar.gz 路径
"""
root = Path(local_root).resolve()
standalone_dir = root / '.next' / 'standalone'
static_src = root / '.next' / 'static'
public_src = root / 'public'
if not standalone_dir.is_dir():
raise FileNotFoundError('.next/standalone 不存在,请先执行 pnpm build')
# Next 要求将 .next/static 和 public 复制进 standalone
standalone_next = standalone_dir / '.next'
standalone_next.mkdir(parents=True, exist_ok=True)
if static_src.is_dir():
dest_static = standalone_next / 'static'
if dest_static.exists():
shutil.rmtree(dest_static)
shutil.copytree(static_src, dest_static)
if public_src.is_dir():
dest_public = standalone_dir / 'public'
if dest_public.exists():
shutil.rmtree(dest_public)
shutil.copytree(public_src, dest_public)
# 复制 PM2 配置到 standalone便于服务器上用 pm2 start ecosystem.config.cjs
ecosystem_src = root / 'ecosystem.config.cjs'
if ecosystem_src.is_file():
shutil.copy2(ecosystem_src, standalone_dir / 'ecosystem.config.cjs')
# 打包 standalone 目录「内容」,使解压到服务器项目目录后根目录即为 server.js
tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False)
tmp.close()
with tarfile.open(tmp.name, 'w:gz') as tar:
for item in standalone_dir.iterdir():
arcname = item.name
tar.add(str(item), arcname=arcname, recursive=True)
return tmp.name
def deploy_by_upload_standalone(ssh, sftp, local_root, remote_path, pm2_name, step_start, app_port=None):
"""本地 standalone 构建 -> 打包 -> 上传 -> 解压 -> PM2 用 node server.js 启动PORT 与 Nginx 一致)"""
step = step_start
root = Path(local_root).resolve()
# 步骤 1: 本地构建
log('本地执行 pnpm buildstandalone', step)
step += 1
if not run_local_build(str(root), step_label=None):
return False
sys.stdout.flush()
# 步骤 2: 打包 standalone
log('打包 .next/standalone含 static、public', step)
step += 1
try:
tarball = make_standalone_tarball(str(root))
size_mb = os.path.getsize(tarball) / 1024 / 1024
log('打包完成,约 %.2f MB' % size_mb)
except FileNotFoundError as e:
log_err(str(e))
return False
except Exception as e:
log_err('打包失败: %s' % e)
return False
sys.stdout.flush()
# 步骤 3: 上传
log('上传到服务器 /tmp/soul_standalone.tar.gz', step)
step += 1
remote_tar = '/tmp/soul_standalone.tar.gz'
try:
sftp.put(tarball, remote_tar)
log('上传完成')
except Exception as e:
log_err('上传失败: %s' % e)
os.unlink(tarball)
return False
os.unlink(tarball)
sys.stdout.flush()
# 步骤 4: 清理并解压(保留 .env 等隐藏配置)
log('清理旧文件并解压 standalone', step)
step += 1
run(ssh, 'cd %s && rm -rf app components lib public styles .next *.json *.js *.ts *.mjs *.css *.d.ts server.js node_modules 2>/dev/null; ls -la' % remote_path, '清理', step_label=None, ignore_err=True)
if not run(ssh, 'cd %s && tar -xzf %s' % (remote_path, remote_tar), '解压'):
log_err('解压失败,请检查服务器磁盘或路径')
return False
run(ssh, 'rm -f %s' % remote_tar, '删除临时包', ignore_err=True)
sys.stdout.flush()
# 步骤 5: PM2 用 node server.js 启动PORT 须与 Nginx proxy_pass 一致(默认 3006
# 宝塔服务器上 pm2 可能不在默认 PATH先注入常见路径
port = app_port if app_port is not None else 3006
log('PM2 启动 node server.jsPORT=%s' % port, step)
pm2_cmd = (
'export PATH=/www/server/nodejs/v22.14.0/bin:/www/server/nvm/versions/node/*/bin:$PATH 2>/dev/null; '
'cd %s && (pm2 delete %s 2>/dev/null; PORT=%s pm2 start server.js --name %s)'
) % (remote_path, pm2_name, port, pm2_name)
run(ssh, pm2_cmd, 'PM2 启动', ignore_err=True)
return True
def main():
print('=' * 60)
print(' Soul 创业派对 - 宝塔一键部署')
print('=' * 60)
print(' %s@%s -> %s' % (CFG['user'], CFG['host'], CFG['path']))
print('=' * 60)
sys.stdout.flush()
# 步骤 1: 连接
log('连接服务器 %s:%s' % (CFG['host'], CFG['port']), '1/6')
password = CFG.get('pwd')
if not CFG['key'] and not password:
password = getpass.getpass('请输入 SSH 密码: ')
sys.stdout.flush()
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
kw = {'hostname': CFG['host'], 'port': CFG['port'], 'username': CFG['user']}
if CFG['key']:
kw['key_filename'] = CFG['key']
else:
kw['password'] = password
ssh.connect(**kw)
log('连接成功')
except Exception as e:
log_err('连接失败: %s' % e)
return 1
sys.stdout.flush()
p, pm = CFG['path'], CFG['pm2']
sftp = ssh.open_sftp()
# 步骤 2~6: 本地 build -> 打包 -> 上传 -> 解压 -> PM2 启动
log('本地打包上传部署(不从 git 拉取)', '2/6')
local_root = Path(__file__).resolve().parent.parent
if not deploy_by_upload_standalone(ssh, sftp, str(local_root), p, pm, step_start=2, app_port=CFG.get('app_port')):
sftp.close()
ssh.close()
log_err('部署中断,请根据上方错误信息排查')
return 1
sftp.close()
ssh.close()
print('')
print('=' * 60)
print(' 部署完成')
print(' 前台: %s' % CFG['url'])
print(' 后台: %s/admin' % CFG['url'])
print('=' * 60)
sys.stdout.flush()
return 0
if __name__ == '__main__':
sys.exit(main())