Files
soul/scripts/deploy_baota.py

371 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Soul 创业派对 - 宝塔一键部署(跨平台)
一键执行: python scripts/deploy_baota.py
依赖: pip install paramiko
流程:本地 pnpm build -> 打包 .next/standalone -> 上传 -> 服务器解压 -> PM2 运行 node server.js
(不从 git 拉取,不在服务器安装依赖或构建。)
"""
from __future__ import print_function
import os
import sys
import getpass
import shutil
import subprocess
import tarfile
import tempfile
import threading
from pathlib import Path
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
def log(msg, step=None):
"""输出并立即刷新,便于看到进度"""
if step is not None:
print('[步骤 %s] %s' % (step, msg))
else:
print(msg)
sys.stdout.flush()
sys.stderr.flush()
def log_err(msg):
print('>>> 错误: %s' % msg, file=sys.stderr)
sys.stderr.flush()
try:
import paramiko
except ImportError:
log('请先安装: pip install paramiko')
sys.exit(1)
# 默认配置(与 开发文档/服务器管理 一致)
# 应用端口须与 端口配置表 及 Nginx proxy_pass 一致soul -> 3006
CFG = {
'host': os.environ.get('DEPLOY_HOST', '42.194.232.22'),
'port': int(os.environ.get('DEPLOY_PORT', '22')),
'app_port': int(os.environ.get('DEPLOY_APP_PORT', '3006')),
'user': os.environ.get('DEPLOY_USER', 'root'),
'pwd': os.environ.get('DEPLOY_PASSWORD', 'Zhiqun1984'),
'path': os.environ.get('DEPLOY_PROJECT_PATH', '/www/wwwroot/soul'),
'branch': os.environ.get('DEPLOY_BRANCH', 'soul-content'),
'pm2': os.environ.get('DEPLOY_PM2_APP', 'soul'),
'url': os.environ.get('DEPLOY_SITE_URL', 'https://soul.quwanzhi.com'),
'key': os.environ.get('DEPLOY_SSH_KEY') or None,
}
EXCLUDE = {
'node_modules', '.next', '.git', '.gitignore', '.cursorrules',
'scripts', 'miniprogram', '开发文档', 'addons', 'book',
'__pycache__', '.DS_Store', '*.log', 'deploy_config.json',
'requirements-deploy.txt', '*.bat', '*.ps1',
}
def run(ssh, cmd, desc, step_label=None, ignore_err=False):
"""执行远程命令,打印完整输出,失败时明确标出错误和退出码"""
if step_label:
log(desc, step_label)
else:
log(desc)
print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd))
sys.stdout.flush()
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
code = stdout.channel.recv_exit_status()
if out:
print(out)
sys.stdout.flush()
if err:
print(err, file=sys.stderr)
sys.stderr.flush()
if code != 0:
log_err('退出码: %s | %s' % (code, desc))
if err and len(err.strip()) > 0:
for line in err.strip().split('\n')[-5:]:
print(' stderr: %s' % line, file=sys.stderr)
sys.stderr.flush()
return ignore_err
return True
def _read_and_print(stream, prefix=' ', is_stderr=False):
"""后台线程:不断读 stream 并打印,用于实时输出"""
import threading
out = sys.stderr if is_stderr else sys.stdout
try:
while True:
line = stream.readline()
if not line:
break
s = line.decode('utf-8', errors='replace').rstrip()
if s:
print('%s%s' % (prefix, s), file=out)
out.flush()
except Exception:
pass
def run_stream(ssh, cmd, desc, step_label=None, ignore_err=False):
"""执行远程命令并实时输出npm install / build 不卡住、能看到进度)"""
if step_label:
log(desc, step_label)
else:
log(desc)
print(' $ %s' % (cmd[:100] + '...' if len(cmd) > 100 else cmd))
sys.stdout.flush()
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
t1 = threading.Thread(target=_read_and_print, args=(stdout, ' ', False))
t2 = threading.Thread(target=_read_and_print, args=(stderr, ' [stderr] ', True))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()
t1.join()
t2.join()
code = stdout.channel.recv_exit_status()
if code != 0:
log_err('退出码: %s | %s' % (code, desc))
return ignore_err
return True
def _tar_filter(ti):
n = ti.name.replace('\\', '/')
if 'node_modules' in n or '.next' in n or '.git' in n:
return None
if '/scripts/' in n or n.startswith('scripts/'):
return None
if '/miniprogram/' in n or n.startswith('miniprogram/'):
return None
if '/开发文档/' in n or '开发文档/' in n:
return None
if '/addons/' in n or '/book/' in n:
return None
return ti
def make_tarball(root_dir):
root = Path(root_dir).resolve()
tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False)
tmp.close()
with tarfile.open(tmp.name, 'w:gz') as tar:
for item in root.iterdir():
name = item.name
if name in EXCLUDE or name.endswith('.md') or (name.startswith('.') and name != '.cursorrules'):
continue
if name.startswith('deploy_config') or name.endswith('.bat') or name.endswith('.ps1'):
continue
arcname = name
tar.add(str(item), arcname=arcname, filter=_tar_filter)
return tmp.name
def run_local_build(local_root, step_label=None):
"""本地执行 pnpm build实时输出"""
root = Path(local_root).resolve()
if step_label:
log('本地构建 pnpm buildstandalone', step_label)
else:
log('本地构建 pnpm buildstandalone')
cmd_str = 'pnpm build'
print(' $ %s' % cmd_str)
sys.stdout.flush()
try:
# Windows 下用 shell=True否则子进程 PATH 里可能没有 pnpm
use_shell = sys.platform == 'win32'
p = subprocess.Popen(
cmd_str if use_shell else ['pnpm', 'build'],
cwd=str(root),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
encoding='utf-8',
errors='replace',
shell=use_shell,
)
for line in p.stdout:
print(' %s' % line.rstrip())
sys.stdout.flush()
code = p.wait()
if code != 0:
log_err('本地构建失败,退出码 %s' % code)
return False
return True
except Exception as e:
log_err('本地构建异常: %s' % e)
return False
def make_standalone_tarball(local_root):
"""
在 next.config 已设置 output: 'standalone' 且已执行 pnpm build 的前提下,
将 .next/static 和 public 复制进 .next/standalone再打包 .next/standalone 目录内容。
返回生成的 tar.gz 路径。
"""
root = Path(local_root).resolve()
standalone_dir = root / '.next' / 'standalone'
static_src = root / '.next' / 'static'
public_src = root / 'public'
if not standalone_dir.is_dir():
raise FileNotFoundError('.next/standalone 不存在,请先执行 pnpm build')
# Next 要求将 .next/static 和 public 复制进 standalone
standalone_next = standalone_dir / '.next'
standalone_next.mkdir(parents=True, exist_ok=True)
if static_src.is_dir():
dest_static = standalone_next / 'static'
if dest_static.exists():
shutil.rmtree(dest_static)
shutil.copytree(static_src, dest_static)
if public_src.is_dir():
dest_public = standalone_dir / 'public'
if dest_public.exists():
shutil.rmtree(dest_public)
shutil.copytree(public_src, dest_public)
# 复制 PM2 配置到 standalone便于服务器上用 pm2 start ecosystem.config.cjs
ecosystem_src = root / 'ecosystem.config.cjs'
if ecosystem_src.is_file():
shutil.copy2(ecosystem_src, standalone_dir / 'ecosystem.config.cjs')
# 打包 standalone 目录「内容」,使解压到服务器项目目录后根目录即为 server.js
tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False)
tmp.close()
with tarfile.open(tmp.name, 'w:gz') as tar:
for item in standalone_dir.iterdir():
arcname = item.name
tar.add(str(item), arcname=arcname, recursive=True)
return tmp.name
def deploy_by_upload_standalone(ssh, sftp, local_root, remote_path, pm2_name, step_start, app_port=None):
"""本地 standalone 构建 -> 打包 -> 上传 -> 解压 -> PM2 用 node server.js 启动PORT 与 Nginx 一致)"""
step = step_start
root = Path(local_root).resolve()
# 步骤 1: 本地构建
log('本地执行 pnpm buildstandalone', step)
step += 1
if not run_local_build(str(root), step_label=None):
return False
sys.stdout.flush()
# 步骤 2: 打包 standalone
log('打包 .next/standalone含 static、public', step)
step += 1
try:
tarball = make_standalone_tarball(str(root))
size_mb = os.path.getsize(tarball) / 1024 / 1024
log('打包完成,约 %.2f MB' % size_mb)
except FileNotFoundError as e:
log_err(str(e))
return False
except Exception as e:
log_err('打包失败: %s' % e)
return False
sys.stdout.flush()
# 步骤 3: 上传
log('上传到服务器 /tmp/soul_standalone.tar.gz', step)
step += 1
remote_tar = '/tmp/soul_standalone.tar.gz'
try:
sftp.put(tarball, remote_tar)
log('上传完成')
except Exception as e:
log_err('上传失败: %s' % e)
os.unlink(tarball)
return False
os.unlink(tarball)
sys.stdout.flush()
# 步骤 4: 清理并解压(保留 .env 等隐藏配置)
log('清理旧文件并解压 standalone', step)
step += 1
run(ssh, 'cd %s && rm -rf app components lib public styles .next *.json *.js *.ts *.mjs *.css *.d.ts server.js node_modules 2>/dev/null; ls -la' % remote_path, '清理', step_label=None, ignore_err=True)
if not run(ssh, 'cd %s && tar -xzf %s' % (remote_path, remote_tar), '解压'):
log_err('解压失败,请检查服务器磁盘或路径')
return False
run(ssh, 'rm -f %s' % remote_tar, '删除临时包', ignore_err=True)
sys.stdout.flush()
# 步骤 5: PM2 用 node server.js 启动PORT 须与 Nginx proxy_pass 一致(默认 3006
# 宝塔服务器上 pm2 可能不在默认 PATH先注入常见路径
port = app_port if app_port is not None else 3006
log('PM2 启动 node server.jsPORT=%s' % port, step)
pm2_cmd = (
'export PATH=/www/server/nodejs/v22.14.0/bin:/www/server/nvm/versions/node/*/bin:$PATH 2>/dev/null; '
'cd %s && (pm2 delete %s 2>/dev/null; PORT=%s pm2 start server.js --name %s)'
) % (remote_path, pm2_name, port, pm2_name)
run(ssh, pm2_cmd, 'PM2 启动', ignore_err=True)
return True
def main():
print('=' * 60)
print(' Soul 创业派对 - 宝塔一键部署')
print('=' * 60)
print(' %s@%s -> %s' % (CFG['user'], CFG['host'], CFG['path']))
print('=' * 60)
sys.stdout.flush()
# 步骤 1: 连接
log('连接服务器 %s:%s' % (CFG['host'], CFG['port']), '1/6')
password = CFG.get('pwd')
if not CFG['key'] and not password:
password = getpass.getpass('请输入 SSH 密码: ')
sys.stdout.flush()
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
kw = {'hostname': CFG['host'], 'port': CFG['port'], 'username': CFG['user']}
if CFG['key']:
kw['key_filename'] = CFG['key']
else:
kw['password'] = password
ssh.connect(**kw)
log('连接成功')
except Exception as e:
log_err('连接失败: %s' % e)
return 1
sys.stdout.flush()
p, pm = CFG['path'], CFG['pm2']
sftp = ssh.open_sftp()
# 步骤 2~6: 本地 build -> 打包 -> 上传 -> 解压 -> PM2 启动
log('本地打包上传部署(不从 git 拉取)', '2/6')
local_root = Path(__file__).resolve().parent.parent
if not deploy_by_upload_standalone(ssh, sftp, str(local_root), p, pm, step_start=2, app_port=CFG.get('app_port')):
sftp.close()
ssh.close()
log_err('部署中断,请根据上方错误信息排查')
return 1
sftp.close()
ssh.close()
print('')
print('=' * 60)
print(' 部署完成')
print(' 前台: %s' % CFG['url'])
print(' 后台: %s/admin' % CFG['url'])
print('=' * 60)
sys.stdout.flush()
return 0
if __name__ == '__main__':
sys.exit(main())