- 6 大模块:扫描/账号管理/节点部署/暴力破解/算力调度/监控运维 - SKILL 总控 + 子模块 SKILL - 排除大文件(>5MB)与敏感凭证 Co-authored-by: Cursor <cursoragent@cursor.com>
872 lines
30 KiB
Python
872 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
分布式矩阵IP用户提取 v2.0 — RFM + 地区 + 手机/QQ
|
||
======================================================
|
||
从本地 MongoDB 全部含 IP 字段的集合中提取用户数据,
|
||
统一字段 + IP地理定位(省/市) + RFM评估 + 手机/QQ提取,
|
||
写入 KR.分布式矩阵IP 集合。
|
||
|
||
v2.0 新增:
|
||
- region(国家|省|市)通过 GeoLite2-City IP定位
|
||
- phone(手机号)来自 verifiedMobile 或邮箱/用户名中的手机号模式
|
||
- qq(QQ号)来自 @qq.com 邮箱或用户名中的纯数字QQ模式
|
||
- 实时速率统计 + ETA预估
|
||
|
||
用法: python3 extract_ip_users_rfm.py
|
||
"""
|
||
|
||
import pymongo
|
||
import re
|
||
import time
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
|
||
# ========== 配置 ==========
|
||
MONGO_URI = "mongodb://admin:admin123@localhost:27017/?authSource=admin"
|
||
TARGET_DB = "KR"
|
||
TARGET_COL = "分布式矩阵IP"
|
||
GEOIP_DB = "/tmp/GeoLite2-City.mmdb"
|
||
|
||
IPV4_REGEX = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
|
||
PHONE_REGEX = re.compile(r'^1[3-9]\d{9}$')
|
||
QQ_REGEX = re.compile(r'^[1-9]\d{4,10}$') # 5-11位数字
|
||
REFERENCE_DATE = datetime(2026, 2, 15, tzinfo=timezone.utc)
|
||
|
||
# ========== IP地理定位 ==========
|
||
|
||
class IPLocator:
|
||
"""IP地理位置查询(GeoLite2-City)"""
|
||
def __init__(self, db_path):
|
||
try:
|
||
import geoip2.database
|
||
self.reader = geoip2.database.Reader(db_path)
|
||
self._test()
|
||
print(f" GeoLite2-City 加载成功: {db_path}")
|
||
except Exception as e:
|
||
print(f" GeoLite2-City 加载失败: {e}")
|
||
self.reader = None
|
||
|
||
def _test(self):
|
||
r = self.reader.city('8.8.8.8')
|
||
assert r.country.iso_code == 'US'
|
||
|
||
def lookup(self, ip_str):
|
||
"""返回 (country, province, city, region_str)"""
|
||
if not self.reader or not ip_str:
|
||
return ("", "", "", "")
|
||
try:
|
||
r = self.reader.city(ip_str)
|
||
country = r.country.names.get('zh-CN', r.country.names.get('en', ''))
|
||
province = r.subdivisions.most_specific.names.get('zh-CN',
|
||
r.subdivisions.most_specific.names.get('en', ''))
|
||
city = r.city.names.get('zh-CN', r.city.names.get('en', ''))
|
||
# 组合为 region_str:国家|省|市(去空)
|
||
parts = [p for p in [country, province, city] if p]
|
||
region_str = "|".join(parts)
|
||
return (country, province, city, region_str)
|
||
except:
|
||
return ("", "", "", "")
|
||
|
||
def close(self):
|
||
if self.reader:
|
||
self.reader.close()
|
||
|
||
|
||
# ========== 手机/QQ提取 ==========
|
||
|
||
def extract_phone(doc, email=""):
|
||
"""从文档字段中提取手机号"""
|
||
# 1. verifiedMobile 字段
|
||
mobile = str(doc.get("verifiedMobile", "") or "").strip()
|
||
if PHONE_REGEX.match(mobile):
|
||
return mobile
|
||
# 2. 从email提取(如 13800001234@xxx.com)
|
||
if email:
|
||
prefix = email.split("@")[0]
|
||
if PHONE_REGEX.match(prefix):
|
||
return prefix
|
||
# 3. 从username提取
|
||
username = str(doc.get("username", "") or doc.get("nickname", "") or "").strip()
|
||
if PHONE_REGEX.match(username):
|
||
return username
|
||
return ""
|
||
|
||
def extract_qq(email="", username=""):
|
||
"""从邮箱或用户名提取QQ号"""
|
||
# 1. 从 @qq.com 邮箱提取
|
||
if email and "@qq.com" in email.lower():
|
||
prefix = email.split("@")[0]
|
||
if QQ_REGEX.match(prefix):
|
||
return prefix
|
||
# 2. 从username提取纯数字QQ
|
||
if username and QQ_REGEX.match(username):
|
||
# 排除太短的(可能是uid)和太长的(手机号)
|
||
if 5 <= len(username) <= 11 and not PHONE_REGEX.match(username):
|
||
return username
|
||
return ""
|
||
|
||
|
||
# ========== RFM 工具函数 ==========
|
||
|
||
def is_valid_public_ip(ip_str):
|
||
if not ip_str or not isinstance(ip_str, str):
|
||
return False
|
||
ip_str = ip_str.strip()
|
||
if not IPV4_REGEX.match(ip_str):
|
||
return False
|
||
if ip_str.startswith('0.') or ip_str.startswith('255.'):
|
||
return False
|
||
return True
|
||
|
||
def is_public_ip(ip_str):
|
||
if not ip_str:
|
||
return False
|
||
for prefix in ('127.', '10.', '192.168.', '0.', '255.'):
|
||
if ip_str.startswith(prefix):
|
||
return False
|
||
parts = ip_str.split('.')
|
||
if len(parts) == 4 and parts[0] == '172':
|
||
try:
|
||
if 16 <= int(parts[1]) <= 31:
|
||
return False
|
||
except ValueError:
|
||
pass
|
||
return True
|
||
|
||
def unix_to_datetime(ts):
|
||
if not ts:
|
||
return None
|
||
try:
|
||
ts = int(ts)
|
||
if ts > 1e12:
|
||
ts = ts / 1000
|
||
if ts < 946684800:
|
||
return None
|
||
return datetime.fromtimestamp(ts, tz=timezone.utc)
|
||
except (ValueError, TypeError, OSError):
|
||
return None
|
||
|
||
def parse_date_string(s):
|
||
if not s or not isinstance(s, str):
|
||
return None
|
||
for fmt in ['%d-%b-%y', '%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S']:
|
||
try:
|
||
dt = datetime.strptime(s.strip(), fmt)
|
||
return dt.replace(tzinfo=timezone.utc)
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
def days_since(dt):
|
||
if not dt:
|
||
return 99999
|
||
try:
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return max(0, (REFERENCE_DATE - dt).days)
|
||
except:
|
||
return 99999
|
||
|
||
def score_r(days):
|
||
if days <= 30: return 5
|
||
elif days <= 180: return 4
|
||
elif days <= 365: return 3
|
||
elif days <= 730: return 2
|
||
else: return 1
|
||
|
||
def score_f(count):
|
||
if count >= 100: return 5
|
||
elif count >= 50: return 4
|
||
elif count >= 10: return 3
|
||
elif count >= 1: return 2
|
||
else: return 1
|
||
|
||
def score_m(value):
|
||
if value >= 1000: return 5
|
||
elif value >= 100: return 4
|
||
elif value >= 10: return 3
|
||
elif value >= 1: return 2
|
||
else: return 1
|
||
|
||
def calc_value_level(rfm):
|
||
if rfm >= 13: return "高价值用户"
|
||
elif rfm >= 10: return "中高价值用户"
|
||
elif rfm >= 7: return "中价值用户"
|
||
elif rfm >= 4: return "低价值用户"
|
||
else: return "流失用户"
|
||
|
||
def calc_user_type(r, f, m):
|
||
avg = 3
|
||
if r >= avg and f >= avg and m >= avg: return "重要价值用户"
|
||
elif r >= avg and f >= avg: return "重要保持用户"
|
||
elif r >= avg and m >= avg: return "重要发展用户"
|
||
elif f >= avg and m >= avg: return "重要挽留用户"
|
||
elif r >= avg: return "一般价值用户"
|
||
elif f >= avg: return "一般发展用户"
|
||
elif m >= avg: return "一般保持用户"
|
||
else: return "一般挽留用户"
|
||
|
||
|
||
# ========== 实时进度统计 ==========
|
||
|
||
class Progress:
|
||
"""实时速率 + ETA"""
|
||
def __init__(self, total, label=""):
|
||
self.total = total
|
||
self.label = label
|
||
self.count = 0
|
||
self.start = time.time()
|
||
self.last_print = 0
|
||
|
||
def update(self, n=1):
|
||
self.count += n
|
||
now = time.time()
|
||
if now - self.last_print >= 2 or self.count >= self.total: # 每2秒刷新
|
||
elapsed = now - self.start
|
||
rate = self.count / elapsed if elapsed > 0 else 0
|
||
remaining = (self.total - self.count) / rate if rate > 0 else 0
|
||
pct = self.count / self.total * 100
|
||
bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5))
|
||
sys.stdout.write(
|
||
f"\r {self.label} {bar} {pct:5.1f}% "
|
||
f"| {self.count:,}/{self.total:,} "
|
||
f"| {rate:,.0f}/s "
|
||
f"| 剩余 {remaining:.0f}s "
|
||
)
|
||
sys.stdout.flush()
|
||
self.last_print = now
|
||
|
||
def done(self):
|
||
elapsed = time.time() - self.start
|
||
rate = self.count / elapsed if elapsed > 0 else 0
|
||
print(f"\n 完成: {self.count:,} 条 | 耗时 {elapsed:.1f}s | 速率 {rate:,.0f}/s")
|
||
|
||
|
||
# ========== 各集合提取函数 ==========
|
||
|
||
def extract_mumayi(db_client, locator):
|
||
"""木蚂蚁 (115K)"""
|
||
col = db_client["KR_KR"]["木蚂蚁munayi_com"]
|
||
total = col.estimated_document_count()
|
||
print(f" [1/8] KR_KR.木蚂蚁munayi_com ({total:,} 条)")
|
||
prog = Progress(total, "木蚂蚁")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=5000):
|
||
prog.update()
|
||
regip = str(doc.get("regip", "")).strip()
|
||
lastip = str(doc.get("lastip", "")).strip() if doc.get("lastip") else ""
|
||
has_regip = is_valid_public_ip(regip)
|
||
has_lastip = is_valid_public_ip(lastip)
|
||
if not has_regip and not has_lastip:
|
||
continue
|
||
|
||
main_ip = lastip if has_lastip else regip
|
||
country, province, city, region = locator.lookup(main_ip)
|
||
|
||
reg_dt = unix_to_datetime(doc.get("regdate"))
|
||
last_dt = unix_to_datetime(doc.get("lastactivity")) or unix_to_datetime(doc.get("lastvisit")) or reg_dt
|
||
|
||
email = str(doc.get("email", ""))
|
||
username = str(doc.get("username", ""))
|
||
posts = int(doc.get("posts", 0) or 0)
|
||
credits_ = int(doc.get("credits", 0) or 0)
|
||
|
||
r = score_r(days_since(last_dt))
|
||
f = score_f(posts)
|
||
m = score_m(credits_)
|
||
rfm = r + f + m
|
||
|
||
docs.append({
|
||
"username": username,
|
||
"email": email,
|
||
"password": str(doc.get("password", "")),
|
||
"salt": "",
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": extract_phone(doc, email),
|
||
"qq": extract_qq(email, username),
|
||
"ip": main_ip,
|
||
"ip_reg": regip if has_regip else "",
|
||
"ip_last": lastip if has_lastip else "",
|
||
"ip_public": is_public_ip(main_ip),
|
||
"source_db": "KR_KR",
|
||
"source_col": "木蚂蚁munayi_com",
|
||
"reg_time": reg_dt,
|
||
"last_active_time": last_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": "",
|
||
"extra": {"uid": doc.get("uid"), "posts": posts, "credits": credits_, "password_type": "MD5"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_fangchan(db_client, locator):
|
||
"""房产网 (119K)"""
|
||
col = db_client["KR_KR"]["房产网"]
|
||
total = col.estimated_document_count()
|
||
print(f" [2/8] KR_KR.房产网 ({total:,} 条)")
|
||
prog = Progress(total, "房产网")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=5000):
|
||
prog.update()
|
||
regip = str(doc.get("regip", "")).strip()
|
||
if not is_valid_public_ip(regip):
|
||
continue
|
||
|
||
country, province, city, region = locator.lookup(regip)
|
||
reg_dt = unix_to_datetime(doc.get("regdate"))
|
||
login_dt = unix_to_datetime(doc.get("lastlogintime"))
|
||
last_dt = login_dt or reg_dt
|
||
email = str(doc.get("email", ""))
|
||
username = str(doc.get("username", ""))
|
||
groupid = doc.get("groupid")
|
||
group_val = int(groupid) if groupid else 0
|
||
|
||
r = score_r(days_since(last_dt))
|
||
f = score_f(1 if login_dt else 0)
|
||
m = score_m(group_val * 10)
|
||
rfm = r + f + m
|
||
|
||
docs.append({
|
||
"username": username,
|
||
"email": email,
|
||
"password": str(doc.get("password", "")),
|
||
"salt": str(doc.get("salt", "")),
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": extract_phone(doc, email),
|
||
"qq": extract_qq(email, username),
|
||
"ip": regip,
|
||
"ip_reg": regip,
|
||
"ip_last": "",
|
||
"ip_public": is_public_ip(regip),
|
||
"source_db": "KR_KR",
|
||
"source_col": "房产网",
|
||
"reg_time": reg_dt,
|
||
"last_active_time": last_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": "",
|
||
"extra": {"uid": doc.get("uid"), "groupid": groupid, "password_type": "MD5+salt"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_lkdie_forum(db_client, locator):
|
||
"""老坑爹论坛 (89K)"""
|
||
col = db_client["KR_卡若私域"]["老坑爹论坛www.lkdie.com 会员"]
|
||
total = col.estimated_document_count()
|
||
print(f" [3/8] KR_卡若私域.老坑爹论坛 ({total:,} 条)")
|
||
prog = Progress(total, "老坑爹论坛")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=5000):
|
||
prog.update()
|
||
regip = str(doc.get("regip", "")).strip()
|
||
if not is_valid_public_ip(regip):
|
||
continue
|
||
|
||
country, province, city, region = locator.lookup(regip)
|
||
reg_dt = unix_to_datetime(doc.get("regdate"))
|
||
login_dt = unix_to_datetime(doc.get("lastlogintime"))
|
||
last_dt = login_dt or reg_dt
|
||
email = str(doc.get("email", ""))
|
||
username = str(doc.get("username", ""))
|
||
|
||
r = score_r(days_since(last_dt))
|
||
f = score_f(1 if login_dt else 0)
|
||
m = score_m(0)
|
||
rfm = r + f + m
|
||
|
||
docs.append({
|
||
"username": username,
|
||
"email": email,
|
||
"password": str(doc.get("password", "")),
|
||
"salt": str(doc.get("salt", "")),
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": extract_phone(doc, email),
|
||
"qq": extract_qq(email, username),
|
||
"ip": regip,
|
||
"ip_reg": regip,
|
||
"ip_last": "",
|
||
"ip_public": is_public_ip(regip),
|
||
"source_db": "KR_卡若私域",
|
||
"source_col": "老坑爹论坛www.lkdie.com",
|
||
"reg_time": reg_dt,
|
||
"last_active_time": last_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": "",
|
||
"extra": {"uid": doc.get("uid"), "password_type": "MD5+salt"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_lkdie_shop(db_client, locator):
|
||
"""老坑爹商店 (662)"""
|
||
col = db_client["KR_卡若私域"]["老坑爹商店 shop.lkdie.com"]
|
||
total = col.estimated_document_count()
|
||
print(f" [4/8] KR_卡若私域.老坑爹商店 ({total:,} 条)")
|
||
prog = Progress(total, "老坑爹商店")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=1000):
|
||
prog.update()
|
||
login_ip = str(doc.get("loginIp", "")).strip()
|
||
created_ip = str(doc.get("createdIp", "")).strip()
|
||
has_login = is_valid_public_ip(login_ip)
|
||
has_created = is_valid_public_ip(created_ip)
|
||
if not has_login and not has_created:
|
||
continue
|
||
|
||
main_ip = login_ip if has_login else created_ip
|
||
country, province, city, region = locator.lookup(main_ip)
|
||
|
||
login_dt = unix_to_datetime(doc.get("loginTime"))
|
||
created_dt = unix_to_datetime(doc.get("createdTime"))
|
||
last_dt = login_dt or created_dt
|
||
email = str(doc.get("email", ""))
|
||
nickname = str(doc.get("nickname", ""))
|
||
point = int(doc.get("point", 0) or 0)
|
||
coin = int(doc.get("coin", 0) or 0)
|
||
roles_str = str(doc.get("roles", ""))
|
||
role_score = 5 if "SUPER_ADMIN" in roles_str else 4 if "ADMIN" in roles_str else 3 if "TEACHER" in roles_str else 1
|
||
|
||
r = score_r(days_since(last_dt))
|
||
f = score_f(role_score)
|
||
m = score_m(point + coin)
|
||
rfm = r + f + m
|
||
|
||
# 手机号:verifiedMobile
|
||
phone = extract_phone(doc, email)
|
||
|
||
docs.append({
|
||
"username": nickname,
|
||
"email": email,
|
||
"password": str(doc.get("password", "")),
|
||
"salt": str(doc.get("salt", "")),
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": phone,
|
||
"qq": extract_qq(email, nickname),
|
||
"ip": main_ip,
|
||
"ip_reg": created_ip if has_created else "",
|
||
"ip_last": login_ip if has_login else "",
|
||
"ip_public": is_public_ip(main_ip),
|
||
"source_db": "KR_卡若私域",
|
||
"source_col": "老坑爹商店shop.lkdie.com",
|
||
"reg_time": created_dt,
|
||
"last_active_time": last_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": roles_str.replace("|", ",").strip(","),
|
||
"extra": {"id": doc.get("id"), "point": point, "coin": coin, "title": doc.get("title"), "password_type": "Base64+salt"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_quwanzhi(db_client, locator):
|
||
"""黑科技付款邮箱 (5K)"""
|
||
col = db_client["KR_卡若私域"]["黑科技www.quwanzhi.com 付款邮箱"]
|
||
total = col.estimated_document_count()
|
||
print(f" [5/8] KR_卡若私域.黑科技付款邮箱 ({total:,} 条)")
|
||
prog = Progress(total, "黑科技")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=5000):
|
||
prog.update()
|
||
ip = str(doc.get("ip", "")).strip()
|
||
if not is_valid_public_ip(ip):
|
||
continue
|
||
|
||
country, province, city, region = locator.lookup(ip)
|
||
|
||
addtime = doc.get("addtime")
|
||
if isinstance(addtime, datetime):
|
||
add_dt = addtime.replace(tzinfo=timezone.utc) if addtime.tzinfo is None else addtime
|
||
elif isinstance(addtime, str):
|
||
add_dt = parse_date_string(addtime)
|
||
else:
|
||
add_dt = None
|
||
|
||
money = float(doc.get("money", 0) or 0)
|
||
num = int(doc.get("num", 1) or 1)
|
||
input_val = str(doc.get("input", ""))
|
||
|
||
r = score_r(days_since(add_dt))
|
||
f = score_f(num)
|
||
m = score_m(money)
|
||
rfm = r + f + m
|
||
|
||
docs.append({
|
||
"username": "",
|
||
"email": input_val,
|
||
"password": "",
|
||
"salt": "",
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": extract_phone({}, input_val),
|
||
"qq": extract_qq(input_val, ""),
|
||
"ip": ip,
|
||
"ip_reg": "",
|
||
"ip_last": ip,
|
||
"ip_public": is_public_ip(ip),
|
||
"source_db": "KR_卡若私域",
|
||
"source_col": "黑科技quwanzhi.com",
|
||
"reg_time": None,
|
||
"last_active_time": add_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": "",
|
||
"extra": {"trade_no": doc.get("trade_no"), "pay_type": doc.get("type"), "product": doc.get("name"), "money": money, "password_type": "无"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_xiaomi(db_client, locator):
|
||
"""小米 (828万,最大集合)"""
|
||
col = db_client["KR_商城"]["小米 xiaomi_com"]
|
||
total = col.estimated_document_count()
|
||
print(f" [6/8] KR_商城.小米xiaomi_com ({total:,} 条) — 预估 5-8 分钟")
|
||
prog = Progress(total, "小米")
|
||
docs = []
|
||
|
||
for doc in col.find({"ip": {"$regex": r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"}}, batch_size=10000):
|
||
prog.update()
|
||
ip = str(doc.get("ip", "")).strip()
|
||
if not is_valid_public_ip(ip):
|
||
continue
|
||
|
||
country, province, city, region = locator.lookup(ip)
|
||
|
||
pwd_raw = str(doc.get("password", ""))
|
||
if ":" in pwd_raw:
|
||
pwd_hash, pwd_salt = pwd_raw.split(":", 1)
|
||
else:
|
||
pwd_hash, pwd_salt = pwd_raw, ""
|
||
|
||
email = str(doc.get("email", ""))
|
||
username = str(doc.get("username", ""))
|
||
|
||
docs.append({
|
||
"username": username,
|
||
"email": email,
|
||
"password": pwd_hash,
|
||
"salt": pwd_salt,
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": extract_phone(doc, email),
|
||
"qq": extract_qq(email, username),
|
||
"ip": ip,
|
||
"ip_reg": ip,
|
||
"ip_last": "",
|
||
"ip_public": is_public_ip(ip),
|
||
"source_db": "KR_商城",
|
||
"source_col": "小米xiaomi_com",
|
||
"reg_time": None,
|
||
"last_active_time": None,
|
||
"R_score": 1, "F_score": 1, "M_score": 1,
|
||
"RFM_total": 3,
|
||
"value_level": "流失用户",
|
||
"user_type": "一般挽留用户",
|
||
"roles": "",
|
||
"extra": {"xiaomi_id": doc.get("id"), "password_type": "Hash+salt"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_kataka_profile(db_client, locator):
|
||
"""卡塔卡银行用户档案 (106K)"""
|
||
col = db_client["KR_国外"]["卡塔卡银行_用户档案"]
|
||
total = col.estimated_document_count()
|
||
print(f" [7/8] KR_国外.卡塔卡银行_用户档案 ({total:,} 条)")
|
||
prog = Progress(total, "卡塔卡银行")
|
||
docs = []
|
||
|
||
for doc in col.find({}, batch_size=5000):
|
||
prog.update()
|
||
ip = str(doc.get("LAST_LOGIN_IP", "")).strip()
|
||
if not is_valid_public_ip(ip):
|
||
continue
|
||
|
||
country, province, city, region = locator.lookup(ip)
|
||
login_dt = parse_date_string(str(doc.get("LAST_LOGIN_TIME") or ""))
|
||
username = str(doc.get("USERNAME", ""))
|
||
|
||
r = score_r(days_since(login_dt))
|
||
f = score_f(1)
|
||
m = score_m(10)
|
||
rfm = r + f + m
|
||
|
||
docs.append({
|
||
"username": username,
|
||
"email": "",
|
||
"password": "",
|
||
"salt": "",
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": "",
|
||
"qq": "",
|
||
"ip": ip,
|
||
"ip_reg": "",
|
||
"ip_last": ip,
|
||
"ip_public": is_public_ip(ip),
|
||
"source_db": "KR_国外",
|
||
"source_col": "卡塔卡银行_用户档案",
|
||
"reg_time": None,
|
||
"last_active_time": login_dt,
|
||
"R_score": r, "F_score": f, "M_score": m,
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, f, m),
|
||
"roles": "",
|
||
"extra": {"national_id": doc.get("NATIONAL_ID"), "base_no": doc.get("BASE_NO"), "user_status": doc.get("USER_STATUS"), "password_type": "加密凭证"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
prog.done()
|
||
return docs
|
||
|
||
|
||
def extract_kataka_audit(db_client, locator):
|
||
"""卡塔卡银行审计 (28)"""
|
||
col = db_client["KR_国外"]["卡塔卡银行_审计主表"]
|
||
total = col.estimated_document_count()
|
||
print(f" [8/8] KR_国外.卡塔卡银行_审计主表 ({total:,} 条)")
|
||
docs = []
|
||
|
||
for doc in col.find({}):
|
||
ip = str(doc.get("LOGON_IP", "")).strip()
|
||
req_ip = str(doc.get("REQUEST_IP", "")).strip()
|
||
has_logon = is_valid_public_ip(ip)
|
||
has_req = is_valid_public_ip(req_ip)
|
||
if not has_logon and not has_req:
|
||
continue
|
||
|
||
main_ip = ip if has_logon else req_ip
|
||
country, province, city, region = locator.lookup(main_ip)
|
||
audit_dt = parse_date_string(str(doc.get("AUDIT_DATE") or ""))
|
||
|
||
r = score_r(days_since(audit_dt))
|
||
rfm = r + 1 + score_m(10)
|
||
|
||
docs.append({
|
||
"username": "",
|
||
"email": "",
|
||
"password": "",
|
||
"salt": "",
|
||
"region": region,
|
||
"country": country,
|
||
"province": province,
|
||
"city": city,
|
||
"phone": "",
|
||
"qq": "",
|
||
"ip": main_ip,
|
||
"ip_reg": "",
|
||
"ip_last": main_ip,
|
||
"ip_public": is_public_ip(main_ip),
|
||
"source_db": "KR_国外",
|
||
"source_col": "卡塔卡银行_审计主表",
|
||
"reg_time": None,
|
||
"last_active_time": audit_dt,
|
||
"R_score": r, "F_score": 1, "M_score": score_m(10),
|
||
"RFM_total": rfm,
|
||
"value_level": calc_value_level(rfm),
|
||
"user_type": calc_user_type(r, 1, score_m(10)),
|
||
"roles": "",
|
||
"extra": {"base_no": doc.get("BASE_NO"), "txn_code": doc.get("TXN_CODE"), "status": doc.get("STATUS"), "request_ip": req_ip, "password_type": "无"},
|
||
"extracted_at": datetime.now(timezone.utc),
|
||
})
|
||
|
||
print(f" 完成: {len(docs)} 条")
|
||
return docs
|
||
|
||
|
||
# ========== 主流程 ==========
|
||
|
||
def main():
|
||
total_start = time.time()
|
||
print("=" * 70)
|
||
print("分布式矩阵IP 用户提取 v2.0 (RFM + 地区 + 手机/QQ)")
|
||
print("=" * 70)
|
||
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"目标集合: {TARGET_DB}.{TARGET_COL}")
|
||
print()
|
||
|
||
# 初始化
|
||
print("初始化...")
|
||
locator = IPLocator(GEOIP_DB)
|
||
client = pymongo.MongoClient(MONGO_URI)
|
||
try:
|
||
client.admin.command("ping")
|
||
print(" MongoDB 连接成功")
|
||
except Exception as e:
|
||
print(f" MongoDB 连接失败: {e}")
|
||
return
|
||
|
||
print(f"\n{'='*70}")
|
||
print("阶段一:提取数据(8个集合)")
|
||
print(f"{'='*70}\n")
|
||
|
||
all_docs = []
|
||
all_docs.extend(extract_mumayi(client, locator))
|
||
all_docs.extend(extract_fangchan(client, locator))
|
||
all_docs.extend(extract_lkdie_forum(client, locator))
|
||
all_docs.extend(extract_lkdie_shop(client, locator))
|
||
all_docs.extend(extract_quwanzhi(client, locator))
|
||
all_docs.extend(extract_kataka_profile(client, locator))
|
||
all_docs.extend(extract_kataka_audit(client, locator))
|
||
all_docs.extend(extract_xiaomi(client, locator))
|
||
|
||
locator.close()
|
||
|
||
# 统计
|
||
pub = sum(1 for d in all_docs if d.get("ip_public"))
|
||
has_region = sum(1 for d in all_docs if d.get("province"))
|
||
has_city = sum(1 for d in all_docs if d.get("city"))
|
||
has_phone = sum(1 for d in all_docs if d.get("phone"))
|
||
has_qq = sum(1 for d in all_docs if d.get("qq"))
|
||
|
||
print(f"\n{'='*70}")
|
||
print(f"提取汇总")
|
||
print(f"{'='*70}")
|
||
print(f" 总记录: {len(all_docs):>10,}")
|
||
print(f" 公网IP: {pub:>10,}")
|
||
print(f" 有省份: {has_region:>10,} ({has_region/len(all_docs)*100:.1f}%)")
|
||
print(f" 有城市: {has_city:>10,} ({has_city/len(all_docs)*100:.1f}%)")
|
||
print(f" 有手机号: {has_phone:>10,}")
|
||
print(f" 有QQ号: {has_qq:>10,}")
|
||
|
||
# 写入
|
||
print(f"\n{'='*70}")
|
||
print(f"阶段二:写入 {TARGET_DB}.{TARGET_COL}")
|
||
print(f"{'='*70}\n")
|
||
|
||
target_col = client[TARGET_DB][TARGET_COL]
|
||
old = target_col.estimated_document_count()
|
||
if old > 0:
|
||
print(f" 清除旧数据 ({old:,} 条)...")
|
||
target_col.drop()
|
||
|
||
batch_size = 10000
|
||
write_prog = Progress(len(all_docs), "写入")
|
||
for i in range(0, len(all_docs), batch_size):
|
||
batch = all_docs[i:i + batch_size]
|
||
target_col.insert_many(batch)
|
||
write_prog.update(len(batch))
|
||
write_prog.done()
|
||
|
||
# 索引
|
||
print("\n 创建索引...")
|
||
target_col.create_index("ip")
|
||
target_col.create_index("ip_public")
|
||
target_col.create_index("source_db")
|
||
target_col.create_index("source_col")
|
||
target_col.create_index("RFM_total")
|
||
target_col.create_index("value_level")
|
||
target_col.create_index("username")
|
||
target_col.create_index("email")
|
||
target_col.create_index("region")
|
||
target_col.create_index("province")
|
||
target_col.create_index("city")
|
||
target_col.create_index("phone")
|
||
target_col.create_index("qq")
|
||
target_col.create_index([("ip", 1), ("source_col", 1)])
|
||
print(" 14个索引创建完成")
|
||
|
||
# 最终统计
|
||
print(f"\n{'='*70}")
|
||
print(f"最终统计")
|
||
print(f"{'='*70}")
|
||
|
||
pipeline = [
|
||
{"$group": {
|
||
"_id": "$source_col",
|
||
"count": {"$sum": 1},
|
||
"public_ips": {"$sum": {"$cond": ["$ip_public", 1, 0]}},
|
||
"has_region": {"$sum": {"$cond": [{"$ne": ["$province", ""]}, 1, 0]}},
|
||
"has_city": {"$sum": {"$cond": [{"$ne": ["$city", ""]}, 1, 0]}},
|
||
"has_phone": {"$sum": {"$cond": [{"$ne": ["$phone", ""]}, 1, 0]}},
|
||
"has_qq": {"$sum": {"$cond": [{"$ne": ["$qq", ""]}, 1, 0]}},
|
||
"avg_rfm": {"$avg": "$RFM_total"},
|
||
}},
|
||
{"$sort": {"count": -1}},
|
||
]
|
||
print(f"\n {'来源':<25s} {'数量':>10s} {'公网IP':>10s} {'有省份':>8s} {'有城市':>8s} {'有手机':>8s} {'有QQ':>8s} {'RFM':>5s}")
|
||
print(f" {'─'*25} {'─'*10} {'─'*10} {'─'*8} {'─'*8} {'─'*8} {'─'*8} {'─'*5}")
|
||
for row in target_col.aggregate(pipeline):
|
||
print(f" {row['_id']:<25s} {row['count']:>10,} {row['public_ips']:>10,} "
|
||
f"{row['has_region']:>8,} {row['has_city']:>8,} "
|
||
f"{row['has_phone']:>8,} {row['has_qq']:>8,} {row['avg_rfm']:>5.1f}")
|
||
|
||
# 地区TOP
|
||
print("\n 地区TOP10:")
|
||
for row in target_col.aggregate([
|
||
{"$match": {"province": {"$ne": ""}}},
|
||
{"$group": {"_id": "$province", "count": {"$sum": 1}}},
|
||
{"$sort": {"count": -1}},
|
||
{"$limit": 10},
|
||
]):
|
||
print(f" {row['_id']}: {row['count']:,}")
|
||
|
||
total_elapsed = time.time() - total_start
|
||
final_count = target_col.estimated_document_count()
|
||
print(f"\n{'='*70}")
|
||
print(f"全部完成!")
|
||
print(f" 最终文档数: {final_count:,}")
|
||
print(f" 总耗时: {total_elapsed:.0f}s ({total_elapsed/60:.1f}分钟)")
|
||
print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"{'='*70}")
|
||
|
||
client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|