Files
suanli-juzhen/02_账号密码管理/scripts/extract_ip_users_rfm.py
卡若 048cc32afc 🎯 初始提交:分布式算力矩阵 v1.0
- 6 大模块:扫描/账号管理/节点部署/暴力破解/算力调度/监控运维
- SKILL 总控 + 子模块 SKILL
- 排除大文件(>5MB)与敏感凭证

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-15 22:46:54 +08:00

872 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
分布式矩阵IP用户提取 v2.0 — RFM + 地区 + 手机/QQ
======================================================
从本地 MongoDB 全部含 IP 字段的集合中提取用户数据,
统一字段 + IP地理定位(省/市) + RFM评估 + 手机/QQ提取
写入 KR.分布式矩阵IP 集合。
v2.0 新增:
- region国家|省|市)通过 GeoLite2-City IP定位
- phone手机号来自 verifiedMobile 或邮箱/用户名中的手机号模式
- qqQQ号来自 @qq.com 邮箱或用户名中的纯数字QQ模式
- 实时速率统计 + ETA预估
用法: python3 extract_ip_users_rfm.py
"""
import pymongo
import re
import time
import sys
from datetime import datetime, timezone
# ========== 配置 ==========
MONGO_URI = "mongodb://admin:admin123@localhost:27017/?authSource=admin"
TARGET_DB = "KR"
TARGET_COL = "分布式矩阵IP"
GEOIP_DB = "/tmp/GeoLite2-City.mmdb"
IPV4_REGEX = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
PHONE_REGEX = re.compile(r'^1[3-9]\d{9}$')
QQ_REGEX = re.compile(r'^[1-9]\d{4,10}$') # 5-11位数字
REFERENCE_DATE = datetime(2026, 2, 15, tzinfo=timezone.utc)
# ========== IP地理定位 ==========
class IPLocator:
"""IP地理位置查询GeoLite2-City"""
def __init__(self, db_path):
try:
import geoip2.database
self.reader = geoip2.database.Reader(db_path)
self._test()
print(f" GeoLite2-City 加载成功: {db_path}")
except Exception as e:
print(f" GeoLite2-City 加载失败: {e}")
self.reader = None
def _test(self):
r = self.reader.city('8.8.8.8')
assert r.country.iso_code == 'US'
def lookup(self, ip_str):
"""返回 (country, province, city, region_str)"""
if not self.reader or not ip_str:
return ("", "", "", "")
try:
r = self.reader.city(ip_str)
country = r.country.names.get('zh-CN', r.country.names.get('en', ''))
province = r.subdivisions.most_specific.names.get('zh-CN',
r.subdivisions.most_specific.names.get('en', ''))
city = r.city.names.get('zh-CN', r.city.names.get('en', ''))
# 组合为 region_str国家|省|市(去空)
parts = [p for p in [country, province, city] if p]
region_str = "|".join(parts)
return (country, province, city, region_str)
except:
return ("", "", "", "")
def close(self):
if self.reader:
self.reader.close()
# ========== 手机/QQ提取 ==========
def extract_phone(doc, email=""):
"""从文档字段中提取手机号"""
# 1. verifiedMobile 字段
mobile = str(doc.get("verifiedMobile", "") or "").strip()
if PHONE_REGEX.match(mobile):
return mobile
# 2. 从email提取如 13800001234@xxx.com
if email:
prefix = email.split("@")[0]
if PHONE_REGEX.match(prefix):
return prefix
# 3. 从username提取
username = str(doc.get("username", "") or doc.get("nickname", "") or "").strip()
if PHONE_REGEX.match(username):
return username
return ""
def extract_qq(email="", username=""):
"""从邮箱或用户名提取QQ号"""
# 1. 从 @qq.com 邮箱提取
if email and "@qq.com" in email.lower():
prefix = email.split("@")[0]
if QQ_REGEX.match(prefix):
return prefix
# 2. 从username提取纯数字QQ
if username and QQ_REGEX.match(username):
# 排除太短的可能是uid和太长的手机号
if 5 <= len(username) <= 11 and not PHONE_REGEX.match(username):
return username
return ""
# ========== RFM 工具函数 ==========
def is_valid_public_ip(ip_str):
if not ip_str or not isinstance(ip_str, str):
return False
ip_str = ip_str.strip()
if not IPV4_REGEX.match(ip_str):
return False
if ip_str.startswith('0.') or ip_str.startswith('255.'):
return False
return True
def is_public_ip(ip_str):
if not ip_str:
return False
for prefix in ('127.', '10.', '192.168.', '0.', '255.'):
if ip_str.startswith(prefix):
return False
parts = ip_str.split('.')
if len(parts) == 4 and parts[0] == '172':
try:
if 16 <= int(parts[1]) <= 31:
return False
except ValueError:
pass
return True
def unix_to_datetime(ts):
if not ts:
return None
try:
ts = int(ts)
if ts > 1e12:
ts = ts / 1000
if ts < 946684800:
return None
return datetime.fromtimestamp(ts, tz=timezone.utc)
except (ValueError, TypeError, OSError):
return None
def parse_date_string(s):
if not s or not isinstance(s, str):
return None
for fmt in ['%d-%b-%y', '%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S']:
try:
dt = datetime.strptime(s.strip(), fmt)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def days_since(dt):
if not dt:
return 99999
try:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return max(0, (REFERENCE_DATE - dt).days)
except:
return 99999
def score_r(days):
if days <= 30: return 5
elif days <= 180: return 4
elif days <= 365: return 3
elif days <= 730: return 2
else: return 1
def score_f(count):
if count >= 100: return 5
elif count >= 50: return 4
elif count >= 10: return 3
elif count >= 1: return 2
else: return 1
def score_m(value):
if value >= 1000: return 5
elif value >= 100: return 4
elif value >= 10: return 3
elif value >= 1: return 2
else: return 1
def calc_value_level(rfm):
if rfm >= 13: return "高价值用户"
elif rfm >= 10: return "中高价值用户"
elif rfm >= 7: return "中价值用户"
elif rfm >= 4: return "低价值用户"
else: return "流失用户"
def calc_user_type(r, f, m):
avg = 3
if r >= avg and f >= avg and m >= avg: return "重要价值用户"
elif r >= avg and f >= avg: return "重要保持用户"
elif r >= avg and m >= avg: return "重要发展用户"
elif f >= avg and m >= avg: return "重要挽留用户"
elif r >= avg: return "一般价值用户"
elif f >= avg: return "一般发展用户"
elif m >= avg: return "一般保持用户"
else: return "一般挽留用户"
# ========== 实时进度统计 ==========
class Progress:
"""实时速率 + ETA"""
def __init__(self, total, label=""):
self.total = total
self.label = label
self.count = 0
self.start = time.time()
self.last_print = 0
def update(self, n=1):
self.count += n
now = time.time()
if now - self.last_print >= 2 or self.count >= self.total: # 每2秒刷新
elapsed = now - self.start
rate = self.count / elapsed if elapsed > 0 else 0
remaining = (self.total - self.count) / rate if rate > 0 else 0
pct = self.count / self.total * 100
bar = "" * int(pct / 5) + "" * (20 - int(pct / 5))
sys.stdout.write(
f"\r {self.label} {bar} {pct:5.1f}% "
f"| {self.count:,}/{self.total:,} "
f"| {rate:,.0f}/s "
f"| 剩余 {remaining:.0f}s "
)
sys.stdout.flush()
self.last_print = now
def done(self):
elapsed = time.time() - self.start
rate = self.count / elapsed if elapsed > 0 else 0
print(f"\n 完成: {self.count:,} 条 | 耗时 {elapsed:.1f}s | 速率 {rate:,.0f}/s")
# ========== 各集合提取函数 ==========
def extract_mumayi(db_client, locator):
"""木蚂蚁 (115K)"""
col = db_client["KR_KR"]["木蚂蚁munayi_com"]
total = col.estimated_document_count()
print(f" [1/8] KR_KR.木蚂蚁munayi_com ({total:,} 条)")
prog = Progress(total, "木蚂蚁")
docs = []
for doc in col.find({}, batch_size=5000):
prog.update()
regip = str(doc.get("regip", "")).strip()
lastip = str(doc.get("lastip", "")).strip() if doc.get("lastip") else ""
has_regip = is_valid_public_ip(regip)
has_lastip = is_valid_public_ip(lastip)
if not has_regip and not has_lastip:
continue
main_ip = lastip if has_lastip else regip
country, province, city, region = locator.lookup(main_ip)
reg_dt = unix_to_datetime(doc.get("regdate"))
last_dt = unix_to_datetime(doc.get("lastactivity")) or unix_to_datetime(doc.get("lastvisit")) or reg_dt
email = str(doc.get("email", ""))
username = str(doc.get("username", ""))
posts = int(doc.get("posts", 0) or 0)
credits_ = int(doc.get("credits", 0) or 0)
r = score_r(days_since(last_dt))
f = score_f(posts)
m = score_m(credits_)
rfm = r + f + m
docs.append({
"username": username,
"email": email,
"password": str(doc.get("password", "")),
"salt": "",
"region": region,
"country": country,
"province": province,
"city": city,
"phone": extract_phone(doc, email),
"qq": extract_qq(email, username),
"ip": main_ip,
"ip_reg": regip if has_regip else "",
"ip_last": lastip if has_lastip else "",
"ip_public": is_public_ip(main_ip),
"source_db": "KR_KR",
"source_col": "木蚂蚁munayi_com",
"reg_time": reg_dt,
"last_active_time": last_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": "",
"extra": {"uid": doc.get("uid"), "posts": posts, "credits": credits_, "password_type": "MD5"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_fangchan(db_client, locator):
"""房产网 (119K)"""
col = db_client["KR_KR"]["房产网"]
total = col.estimated_document_count()
print(f" [2/8] KR_KR.房产网 ({total:,} 条)")
prog = Progress(total, "房产网")
docs = []
for doc in col.find({}, batch_size=5000):
prog.update()
regip = str(doc.get("regip", "")).strip()
if not is_valid_public_ip(regip):
continue
country, province, city, region = locator.lookup(regip)
reg_dt = unix_to_datetime(doc.get("regdate"))
login_dt = unix_to_datetime(doc.get("lastlogintime"))
last_dt = login_dt or reg_dt
email = str(doc.get("email", ""))
username = str(doc.get("username", ""))
groupid = doc.get("groupid")
group_val = int(groupid) if groupid else 0
r = score_r(days_since(last_dt))
f = score_f(1 if login_dt else 0)
m = score_m(group_val * 10)
rfm = r + f + m
docs.append({
"username": username,
"email": email,
"password": str(doc.get("password", "")),
"salt": str(doc.get("salt", "")),
"region": region,
"country": country,
"province": province,
"city": city,
"phone": extract_phone(doc, email),
"qq": extract_qq(email, username),
"ip": regip,
"ip_reg": regip,
"ip_last": "",
"ip_public": is_public_ip(regip),
"source_db": "KR_KR",
"source_col": "房产网",
"reg_time": reg_dt,
"last_active_time": last_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": "",
"extra": {"uid": doc.get("uid"), "groupid": groupid, "password_type": "MD5+salt"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_lkdie_forum(db_client, locator):
"""老坑爹论坛 (89K)"""
col = db_client["KR_卡若私域"]["老坑爹论坛www.lkdie.com 会员"]
total = col.estimated_document_count()
print(f" [3/8] KR_卡若私域.老坑爹论坛 ({total:,} 条)")
prog = Progress(total, "老坑爹论坛")
docs = []
for doc in col.find({}, batch_size=5000):
prog.update()
regip = str(doc.get("regip", "")).strip()
if not is_valid_public_ip(regip):
continue
country, province, city, region = locator.lookup(regip)
reg_dt = unix_to_datetime(doc.get("regdate"))
login_dt = unix_to_datetime(doc.get("lastlogintime"))
last_dt = login_dt or reg_dt
email = str(doc.get("email", ""))
username = str(doc.get("username", ""))
r = score_r(days_since(last_dt))
f = score_f(1 if login_dt else 0)
m = score_m(0)
rfm = r + f + m
docs.append({
"username": username,
"email": email,
"password": str(doc.get("password", "")),
"salt": str(doc.get("salt", "")),
"region": region,
"country": country,
"province": province,
"city": city,
"phone": extract_phone(doc, email),
"qq": extract_qq(email, username),
"ip": regip,
"ip_reg": regip,
"ip_last": "",
"ip_public": is_public_ip(regip),
"source_db": "KR_卡若私域",
"source_col": "老坑爹论坛www.lkdie.com",
"reg_time": reg_dt,
"last_active_time": last_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": "",
"extra": {"uid": doc.get("uid"), "password_type": "MD5+salt"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_lkdie_shop(db_client, locator):
"""老坑爹商店 (662)"""
col = db_client["KR_卡若私域"]["老坑爹商店 shop.lkdie.com"]
total = col.estimated_document_count()
print(f" [4/8] KR_卡若私域.老坑爹商店 ({total:,} 条)")
prog = Progress(total, "老坑爹商店")
docs = []
for doc in col.find({}, batch_size=1000):
prog.update()
login_ip = str(doc.get("loginIp", "")).strip()
created_ip = str(doc.get("createdIp", "")).strip()
has_login = is_valid_public_ip(login_ip)
has_created = is_valid_public_ip(created_ip)
if not has_login and not has_created:
continue
main_ip = login_ip if has_login else created_ip
country, province, city, region = locator.lookup(main_ip)
login_dt = unix_to_datetime(doc.get("loginTime"))
created_dt = unix_to_datetime(doc.get("createdTime"))
last_dt = login_dt or created_dt
email = str(doc.get("email", ""))
nickname = str(doc.get("nickname", ""))
point = int(doc.get("point", 0) or 0)
coin = int(doc.get("coin", 0) or 0)
roles_str = str(doc.get("roles", ""))
role_score = 5 if "SUPER_ADMIN" in roles_str else 4 if "ADMIN" in roles_str else 3 if "TEACHER" in roles_str else 1
r = score_r(days_since(last_dt))
f = score_f(role_score)
m = score_m(point + coin)
rfm = r + f + m
# 手机号verifiedMobile
phone = extract_phone(doc, email)
docs.append({
"username": nickname,
"email": email,
"password": str(doc.get("password", "")),
"salt": str(doc.get("salt", "")),
"region": region,
"country": country,
"province": province,
"city": city,
"phone": phone,
"qq": extract_qq(email, nickname),
"ip": main_ip,
"ip_reg": created_ip if has_created else "",
"ip_last": login_ip if has_login else "",
"ip_public": is_public_ip(main_ip),
"source_db": "KR_卡若私域",
"source_col": "老坑爹商店shop.lkdie.com",
"reg_time": created_dt,
"last_active_time": last_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": roles_str.replace("|", ",").strip(","),
"extra": {"id": doc.get("id"), "point": point, "coin": coin, "title": doc.get("title"), "password_type": "Base64+salt"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_quwanzhi(db_client, locator):
"""黑科技付款邮箱 (5K)"""
col = db_client["KR_卡若私域"]["黑科技www.quwanzhi.com 付款邮箱"]
total = col.estimated_document_count()
print(f" [5/8] KR_卡若私域.黑科技付款邮箱 ({total:,} 条)")
prog = Progress(total, "黑科技")
docs = []
for doc in col.find({}, batch_size=5000):
prog.update()
ip = str(doc.get("ip", "")).strip()
if not is_valid_public_ip(ip):
continue
country, province, city, region = locator.lookup(ip)
addtime = doc.get("addtime")
if isinstance(addtime, datetime):
add_dt = addtime.replace(tzinfo=timezone.utc) if addtime.tzinfo is None else addtime
elif isinstance(addtime, str):
add_dt = parse_date_string(addtime)
else:
add_dt = None
money = float(doc.get("money", 0) or 0)
num = int(doc.get("num", 1) or 1)
input_val = str(doc.get("input", ""))
r = score_r(days_since(add_dt))
f = score_f(num)
m = score_m(money)
rfm = r + f + m
docs.append({
"username": "",
"email": input_val,
"password": "",
"salt": "",
"region": region,
"country": country,
"province": province,
"city": city,
"phone": extract_phone({}, input_val),
"qq": extract_qq(input_val, ""),
"ip": ip,
"ip_reg": "",
"ip_last": ip,
"ip_public": is_public_ip(ip),
"source_db": "KR_卡若私域",
"source_col": "黑科技quwanzhi.com",
"reg_time": None,
"last_active_time": add_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": "",
"extra": {"trade_no": doc.get("trade_no"), "pay_type": doc.get("type"), "product": doc.get("name"), "money": money, "password_type": ""},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_xiaomi(db_client, locator):
"""小米 (828万最大集合)"""
col = db_client["KR_商城"]["小米 xiaomi_com"]
total = col.estimated_document_count()
print(f" [6/8] KR_商城.小米xiaomi_com ({total:,} 条) — 预估 5-8 分钟")
prog = Progress(total, "小米")
docs = []
for doc in col.find({"ip": {"$regex": r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"}}, batch_size=10000):
prog.update()
ip = str(doc.get("ip", "")).strip()
if not is_valid_public_ip(ip):
continue
country, province, city, region = locator.lookup(ip)
pwd_raw = str(doc.get("password", ""))
if ":" in pwd_raw:
pwd_hash, pwd_salt = pwd_raw.split(":", 1)
else:
pwd_hash, pwd_salt = pwd_raw, ""
email = str(doc.get("email", ""))
username = str(doc.get("username", ""))
docs.append({
"username": username,
"email": email,
"password": pwd_hash,
"salt": pwd_salt,
"region": region,
"country": country,
"province": province,
"city": city,
"phone": extract_phone(doc, email),
"qq": extract_qq(email, username),
"ip": ip,
"ip_reg": ip,
"ip_last": "",
"ip_public": is_public_ip(ip),
"source_db": "KR_商城",
"source_col": "小米xiaomi_com",
"reg_time": None,
"last_active_time": None,
"R_score": 1, "F_score": 1, "M_score": 1,
"RFM_total": 3,
"value_level": "流失用户",
"user_type": "一般挽留用户",
"roles": "",
"extra": {"xiaomi_id": doc.get("id"), "password_type": "Hash+salt"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_kataka_profile(db_client, locator):
"""卡塔卡银行用户档案 (106K)"""
col = db_client["KR_国外"]["卡塔卡银行_用户档案"]
total = col.estimated_document_count()
print(f" [7/8] KR_国外.卡塔卡银行_用户档案 ({total:,} 条)")
prog = Progress(total, "卡塔卡银行")
docs = []
for doc in col.find({}, batch_size=5000):
prog.update()
ip = str(doc.get("LAST_LOGIN_IP", "")).strip()
if not is_valid_public_ip(ip):
continue
country, province, city, region = locator.lookup(ip)
login_dt = parse_date_string(str(doc.get("LAST_LOGIN_TIME") or ""))
username = str(doc.get("USERNAME", ""))
r = score_r(days_since(login_dt))
f = score_f(1)
m = score_m(10)
rfm = r + f + m
docs.append({
"username": username,
"email": "",
"password": "",
"salt": "",
"region": region,
"country": country,
"province": province,
"city": city,
"phone": "",
"qq": "",
"ip": ip,
"ip_reg": "",
"ip_last": ip,
"ip_public": is_public_ip(ip),
"source_db": "KR_国外",
"source_col": "卡塔卡银行_用户档案",
"reg_time": None,
"last_active_time": login_dt,
"R_score": r, "F_score": f, "M_score": m,
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, f, m),
"roles": "",
"extra": {"national_id": doc.get("NATIONAL_ID"), "base_no": doc.get("BASE_NO"), "user_status": doc.get("USER_STATUS"), "password_type": "加密凭证"},
"extracted_at": datetime.now(timezone.utc),
})
prog.done()
return docs
def extract_kataka_audit(db_client, locator):
"""卡塔卡银行审计 (28)"""
col = db_client["KR_国外"]["卡塔卡银行_审计主表"]
total = col.estimated_document_count()
print(f" [8/8] KR_国外.卡塔卡银行_审计主表 ({total:,} 条)")
docs = []
for doc in col.find({}):
ip = str(doc.get("LOGON_IP", "")).strip()
req_ip = str(doc.get("REQUEST_IP", "")).strip()
has_logon = is_valid_public_ip(ip)
has_req = is_valid_public_ip(req_ip)
if not has_logon and not has_req:
continue
main_ip = ip if has_logon else req_ip
country, province, city, region = locator.lookup(main_ip)
audit_dt = parse_date_string(str(doc.get("AUDIT_DATE") or ""))
r = score_r(days_since(audit_dt))
rfm = r + 1 + score_m(10)
docs.append({
"username": "",
"email": "",
"password": "",
"salt": "",
"region": region,
"country": country,
"province": province,
"city": city,
"phone": "",
"qq": "",
"ip": main_ip,
"ip_reg": "",
"ip_last": main_ip,
"ip_public": is_public_ip(main_ip),
"source_db": "KR_国外",
"source_col": "卡塔卡银行_审计主表",
"reg_time": None,
"last_active_time": audit_dt,
"R_score": r, "F_score": 1, "M_score": score_m(10),
"RFM_total": rfm,
"value_level": calc_value_level(rfm),
"user_type": calc_user_type(r, 1, score_m(10)),
"roles": "",
"extra": {"base_no": doc.get("BASE_NO"), "txn_code": doc.get("TXN_CODE"), "status": doc.get("STATUS"), "request_ip": req_ip, "password_type": ""},
"extracted_at": datetime.now(timezone.utc),
})
print(f" 完成: {len(docs)}")
return docs
# ========== 主流程 ==========
def main():
total_start = time.time()
print("=" * 70)
print("分布式矩阵IP 用户提取 v2.0 (RFM + 地区 + 手机/QQ)")
print("=" * 70)
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"目标集合: {TARGET_DB}.{TARGET_COL}")
print()
# 初始化
print("初始化...")
locator = IPLocator(GEOIP_DB)
client = pymongo.MongoClient(MONGO_URI)
try:
client.admin.command("ping")
print(" MongoDB 连接成功")
except Exception as e:
print(f" MongoDB 连接失败: {e}")
return
print(f"\n{'='*70}")
print("阶段一提取数据8个集合")
print(f"{'='*70}\n")
all_docs = []
all_docs.extend(extract_mumayi(client, locator))
all_docs.extend(extract_fangchan(client, locator))
all_docs.extend(extract_lkdie_forum(client, locator))
all_docs.extend(extract_lkdie_shop(client, locator))
all_docs.extend(extract_quwanzhi(client, locator))
all_docs.extend(extract_kataka_profile(client, locator))
all_docs.extend(extract_kataka_audit(client, locator))
all_docs.extend(extract_xiaomi(client, locator))
locator.close()
# 统计
pub = sum(1 for d in all_docs if d.get("ip_public"))
has_region = sum(1 for d in all_docs if d.get("province"))
has_city = sum(1 for d in all_docs if d.get("city"))
has_phone = sum(1 for d in all_docs if d.get("phone"))
has_qq = sum(1 for d in all_docs if d.get("qq"))
print(f"\n{'='*70}")
print(f"提取汇总")
print(f"{'='*70}")
print(f" 总记录: {len(all_docs):>10,}")
print(f" 公网IP: {pub:>10,}")
print(f" 有省份: {has_region:>10,} ({has_region/len(all_docs)*100:.1f}%)")
print(f" 有城市: {has_city:>10,} ({has_city/len(all_docs)*100:.1f}%)")
print(f" 有手机号: {has_phone:>10,}")
print(f" 有QQ号: {has_qq:>10,}")
# 写入
print(f"\n{'='*70}")
print(f"阶段二:写入 {TARGET_DB}.{TARGET_COL}")
print(f"{'='*70}\n")
target_col = client[TARGET_DB][TARGET_COL]
old = target_col.estimated_document_count()
if old > 0:
print(f" 清除旧数据 ({old:,} 条)...")
target_col.drop()
batch_size = 10000
write_prog = Progress(len(all_docs), "写入")
for i in range(0, len(all_docs), batch_size):
batch = all_docs[i:i + batch_size]
target_col.insert_many(batch)
write_prog.update(len(batch))
write_prog.done()
# 索引
print("\n 创建索引...")
target_col.create_index("ip")
target_col.create_index("ip_public")
target_col.create_index("source_db")
target_col.create_index("source_col")
target_col.create_index("RFM_total")
target_col.create_index("value_level")
target_col.create_index("username")
target_col.create_index("email")
target_col.create_index("region")
target_col.create_index("province")
target_col.create_index("city")
target_col.create_index("phone")
target_col.create_index("qq")
target_col.create_index([("ip", 1), ("source_col", 1)])
print(" 14个索引创建完成")
# 最终统计
print(f"\n{'='*70}")
print(f"最终统计")
print(f"{'='*70}")
pipeline = [
{"$group": {
"_id": "$source_col",
"count": {"$sum": 1},
"public_ips": {"$sum": {"$cond": ["$ip_public", 1, 0]}},
"has_region": {"$sum": {"$cond": [{"$ne": ["$province", ""]}, 1, 0]}},
"has_city": {"$sum": {"$cond": [{"$ne": ["$city", ""]}, 1, 0]}},
"has_phone": {"$sum": {"$cond": [{"$ne": ["$phone", ""]}, 1, 0]}},
"has_qq": {"$sum": {"$cond": [{"$ne": ["$qq", ""]}, 1, 0]}},
"avg_rfm": {"$avg": "$RFM_total"},
}},
{"$sort": {"count": -1}},
]
print(f"\n {'来源':<25s} {'数量':>10s} {'公网IP':>10s} {'有省份':>8s} {'有城市':>8s} {'有手机':>8s} {'有QQ':>8s} {'RFM':>5s}")
print(f" {''*25} {''*10} {''*10} {''*8} {''*8} {''*8} {''*8} {''*5}")
for row in target_col.aggregate(pipeline):
print(f" {row['_id']:<25s} {row['count']:>10,} {row['public_ips']:>10,} "
f"{row['has_region']:>8,} {row['has_city']:>8,} "
f"{row['has_phone']:>8,} {row['has_qq']:>8,} {row['avg_rfm']:>5.1f}")
# 地区TOP
print("\n 地区TOP10:")
for row in target_col.aggregate([
{"$match": {"province": {"$ne": ""}}},
{"$group": {"_id": "$province", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10},
]):
print(f" {row['_id']}: {row['count']:,}")
total_elapsed = time.time() - total_start
final_count = target_col.estimated_document_count()
print(f"\n{'='*70}")
print(f"全部完成!")
print(f" 最终文档数: {final_count:,}")
print(f" 总耗时: {total_elapsed:.0f}s ({total_elapsed/60:.1f}分钟)")
print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*70}")
client.close()
if __name__ == "__main__":
main()