Files
soul-yongping/soul-api/miner_guard_install.py

156 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
挖矿病毒守护 - 安装脚本
将 miner_guard.sh 上传到服务器,并配置每 30 分钟执行一次。
使用: python miner_guard_install.py [--yes]
"""
import os
import sys
import io
try:
import paramiko
except ImportError:
print("错误: pip install paramiko")
sys.exit(1)
DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022"))
def get_cfg():
host = os.environ.get("DEPLOY_HOST")
if not host:
try:
import importlib.util
spec = importlib.util.spec_from_file_location(
"master", os.path.join(os.path.dirname(__file__), "master.py")
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
c = mod.get_cfg()
return {
"host": c["host"], "user": c.get("user", "root"),
"password": c.get("password", ""), "ssh_key": c.get("ssh_key", ""),
"port": int(os.environ.get("DEPLOY_SSH_PORT", str(DEFAULT_SSH_PORT))),
}
except Exception:
pass
return {
"host": host or "", "user": os.environ.get("DEPLOY_USER", "root"),
"password": os.environ.get("DEPLOY_PASSWORD", ""),
"ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""),
"port": int(os.environ.get("DEPLOY_SSH_PORT", str(DEFAULT_SSH_PORT))),
}
def main():
import argparse
p = argparse.ArgumentParser(description="安装挖矿守护到服务器")
p.add_argument("--yes", "-y", action="store_true", help="跳过确认")
args = p.parse_args()
cfg = get_cfg()
if not cfg.get("host") or (not cfg.get("password") and not cfg.get("ssh_key")):
print("[错误] 需配置 DEPLOY_HOST 和 DEPLOY_PASSWORD")
sys.exit(1)
script_dir = os.path.dirname(os.path.abspath(__file__))
local_sh = os.path.join(script_dir, "miner_guard.sh")
if not os.path.isfile(local_sh):
print("[错误] 未找到 miner_guard.sh")
sys.exit(1)
remote_path = "/root/miner_guard.sh"
cron_line = "*/30 * * * * /bin/bash %s >> /var/log/miner_guard.log 2>&1" % remote_path
print("=" * 60)
print(" 挖矿病毒守护 - 安装")
print("=" * 60)
print(" 服务器: %s" % cfg["host"])
print(" 脚本路径: %s" % remote_path)
print(" Cron: 每 30 分钟执行")
print("=" * 60)
if not args.yes:
print("\n确认安装? 输入 yes 继续: ", end="")
if input().strip().lower() != "yes":
print("已取消")
return
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]):
client.connect(cfg["host"], port=cfg["port"], username=cfg["user"],
key_filename=cfg["ssh_key"], timeout=15)
else:
client.connect(cfg["host"], port=cfg["port"], username=cfg["user"],
password=cfg["password"], timeout=15)
except Exception as e:
print("[连接失败]", str(e))
sys.exit(1)
sftp = client.open_sftp()
with open(local_sh, "rb") as f:
sftp.putfo(f, remote_path)
sftp.chmod(remote_path, 0o755)
sftp.close()
# 通过 SSH 执行写入 /etc/cron.d/root 有权限)
cron_line = "*/30 * * * * root /bin/bash %s >> /var/log/miner_guard.log 2>&1" % remote_path
cron_body = "SHELL=/bin/bash\nPATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n%s\n" % cron_line
cron_d_file = "/etc/cron.d/miner_guard"
# 用 base64 避免 shell 转义问题
import base64
b64 = base64.b64encode(cron_body.encode()).decode()
stdin, stdout, stderr = client.exec_command(
"echo %s | base64 -d | tee %s > /dev/null && chmod 644 %s" % (b64, cron_d_file, cron_d_file),
timeout=10
)
err = stderr.read().decode("utf-8", errors="replace").strip()
if err:
print("\n[备选] /etc/cron.d 失败")
# 尝试 crontab
crontab_line = "*/30 * * * * /bin/bash %s >> /var/log/miner_guard.log 2>&1\n" % remote_path
tmp_cron = "/tmp/miner_guard_cron_%s" % os.getpid()
sftp = client.open_sftp()
try:
with sftp.file(tmp_cron, "w") as f:
f.write(crontab_line)
finally:
sftp.close()
sin, sout, serr = client.exec_command("crontab %s 2>&1; rm -f %s; crontab -l 2>/dev/null" % (tmp_cron, tmp_cron), timeout=10)
out = sout.read().decode("utf-8", errors="replace")
err_msg = serr.read().decode("utf-8", errors="replace").strip()
if "miner_guard" in out:
print("[成功] crontab 已添加")
else:
# 尝试 systemd timer
print(" crontab 不可用,尝试 systemd timer...")
svc = "[Unit]\nDescription=Miner Guard\nAfter=network.target\n[Service]\nType=oneshot\nExecStart=/bin/bash %s\n[Install]\nWantedBy=multi-user.target\n" % remote_path
tmr = "[Unit]\nDescription=Miner Guard 30min\n[Timer]\nOnBootSec=1min\nOnUnitActiveSec=30min\nPersistent=true\n[Install]\nWantedBy=timers.target\n"
try:
sftp = client.open_sftp()
sftp.putfo(io.BytesIO(svc.encode()), "/etc/systemd/system/miner-guard.service")
sftp.putfo(io.BytesIO((tmr.encode())), "/etc/systemd/system/miner-guard.timer")
sftp.close()
sin, sout, serr = client.exec_command("systemctl daemon-reload && systemctl enable miner-guard.timer && systemctl start miner-guard.timer 2>&1", timeout=15)
r = sout.read().decode("utf-8", errors="replace") + serr.read().decode("utf-8", errors="replace")
if "Failed" not in r and "denied" not in r:
print("[成功] systemd timer 已启用,每 30 分钟执行")
else:
raise IOError(r)
except Exception as ex:
print(" systemd 失败: %s" % ex)
print(" 请 SSH 登录后手动执行: (crontab -l 2>/dev/null; echo '%s') | crontab -" % cron_line.strip())
else:
print("\n[成功] 已写入 %s,每 30 分钟执行" % cron_d_file)
client.close()
print("\n日志: /var/log/miner_guard.log")
print("=" * 60)
if __name__ == "__main__":
main()