#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 挖矿病毒守护 - 安装脚本 将 miner_guard.sh 上传到服务器,并配置每 30 分钟执行一次。 使用: python miner_guard_install.py [--yes] """ import os import sys import io try: import paramiko except ImportError: print("错误: pip install paramiko") sys.exit(1) DEFAULT_SSH_PORT = int(os.environ.get("DEPLOY_SSH_PORT", "22022")) def get_cfg(): host = os.environ.get("DEPLOY_HOST") if not host: try: import importlib.util spec = importlib.util.spec_from_file_location( "master", os.path.join(os.path.dirname(__file__), "master.py") ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) c = mod.get_cfg() return { "host": c["host"], "user": c.get("user", "root"), "password": c.get("password", ""), "ssh_key": c.get("ssh_key", ""), "port": int(os.environ.get("DEPLOY_SSH_PORT", str(DEFAULT_SSH_PORT))), } except Exception: pass return { "host": host or "", "user": os.environ.get("DEPLOY_USER", "root"), "password": os.environ.get("DEPLOY_PASSWORD", ""), "ssh_key": os.environ.get("DEPLOY_SSH_KEY", ""), "port": int(os.environ.get("DEPLOY_SSH_PORT", str(DEFAULT_SSH_PORT))), } def main(): import argparse p = argparse.ArgumentParser(description="安装挖矿守护到服务器") p.add_argument("--yes", "-y", action="store_true", help="跳过确认") args = p.parse_args() cfg = get_cfg() if not cfg.get("host") or (not cfg.get("password") and not cfg.get("ssh_key")): print("[错误] 需配置 DEPLOY_HOST 和 DEPLOY_PASSWORD") sys.exit(1) script_dir = os.path.dirname(os.path.abspath(__file__)) local_sh = os.path.join(script_dir, "miner_guard.sh") if not os.path.isfile(local_sh): print("[错误] 未找到 miner_guard.sh") sys.exit(1) remote_path = "/root/miner_guard.sh" cron_line = "*/30 * * * * /bin/bash %s >> /var/log/miner_guard.log 2>&1" % remote_path print("=" * 60) print(" 挖矿病毒守护 - 安装") print("=" * 60) print(" 服务器: %s" % cfg["host"]) print(" 脚本路径: %s" % remote_path) print(" Cron: 每 30 分钟执行") print("=" * 60) if not args.yes: print("\n确认安装? 输入 yes 继续: ", end="") if input().strip().lower() != "yes": print("已取消") return client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if cfg.get("ssh_key") and os.path.isfile(cfg["ssh_key"]): client.connect(cfg["host"], port=cfg["port"], username=cfg["user"], key_filename=cfg["ssh_key"], timeout=15) else: client.connect(cfg["host"], port=cfg["port"], username=cfg["user"], password=cfg["password"], timeout=15) except Exception as e: print("[连接失败]", str(e)) sys.exit(1) sftp = client.open_sftp() with open(local_sh, "rb") as f: sftp.putfo(f, remote_path) sftp.chmod(remote_path, 0o755) sftp.close() # 通过 SSH 执行写入 /etc/cron.d/(root 有权限) cron_line = "*/30 * * * * root /bin/bash %s >> /var/log/miner_guard.log 2>&1" % remote_path cron_body = "SHELL=/bin/bash\nPATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n%s\n" % cron_line cron_d_file = "/etc/cron.d/miner_guard" # 用 base64 避免 shell 转义问题 import base64 b64 = base64.b64encode(cron_body.encode()).decode() stdin, stdout, stderr = client.exec_command( "echo %s | base64 -d | tee %s > /dev/null && chmod 644 %s" % (b64, cron_d_file, cron_d_file), timeout=10 ) err = stderr.read().decode("utf-8", errors="replace").strip() if err: print("\n[备选] /etc/cron.d 失败") # 尝试 crontab crontab_line = "*/30 * * * * /bin/bash %s >> /var/log/miner_guard.log 2>&1\n" % remote_path tmp_cron = "/tmp/miner_guard_cron_%s" % os.getpid() sftp = client.open_sftp() try: with sftp.file(tmp_cron, "w") as f: f.write(crontab_line) finally: sftp.close() sin, sout, serr = client.exec_command("crontab %s 2>&1; rm -f %s; crontab -l 2>/dev/null" % (tmp_cron, tmp_cron), timeout=10) out = sout.read().decode("utf-8", errors="replace") err_msg = serr.read().decode("utf-8", errors="replace").strip() if "miner_guard" in out: print("[成功] crontab 已添加") else: # 尝试 systemd timer print(" crontab 不可用,尝试 systemd timer...") svc = "[Unit]\nDescription=Miner Guard\nAfter=network.target\n[Service]\nType=oneshot\nExecStart=/bin/bash %s\n[Install]\nWantedBy=multi-user.target\n" % remote_path tmr = "[Unit]\nDescription=Miner Guard 30min\n[Timer]\nOnBootSec=1min\nOnUnitActiveSec=30min\nPersistent=true\n[Install]\nWantedBy=timers.target\n" try: sftp = client.open_sftp() sftp.putfo(io.BytesIO(svc.encode()), "/etc/systemd/system/miner-guard.service") sftp.putfo(io.BytesIO((tmr.encode())), "/etc/systemd/system/miner-guard.timer") sftp.close() sin, sout, serr = client.exec_command("systemctl daemon-reload && systemctl enable miner-guard.timer && systemctl start miner-guard.timer 2>&1", timeout=15) r = sout.read().decode("utf-8", errors="replace") + serr.read().decode("utf-8", errors="replace") if "Failed" not in r and "denied" not in r: print("[成功] systemd timer 已启用,每 30 分钟执行") else: raise IOError(r) except Exception as ex: print(" systemd 失败: %s" % ex) print(" 请 SSH 登录后手动执行: (crontab -l 2>/dev/null; echo '%s') | crontab -" % cron_line.strip()) else: print("\n[成功] 已写入 %s,每 30 分钟执行" % cron_d_file) client.close() print("\n日志: /var/log/miner_guard.log") print("=" * 60) if __name__ == "__main__": main()