133 lines
4.1 KiB
Python
133 lines
4.1 KiB
Python
import re
|
||
from dataclasses import dataclass
|
||
|
||
import requests
|
||
|
||
|
||
ROUTER_GO = r"e:\\Gongsi\\Mycontent\\soul-api\\internal\\router\\router.go"
|
||
|
||
|
||
@dataclass
|
||
class Check:
|
||
method: str
|
||
path: str
|
||
status: int
|
||
preview: str
|
||
|
||
|
||
def _read_text(path: str) -> str:
|
||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||
return f.read()
|
||
|
||
|
||
def extract_routes() -> list[tuple[str, str]]:
|
||
"""
|
||
返回 [(method, full_path_template), ...]
|
||
full_path_template 保留 :id 占位符。
|
||
"""
|
||
text = _read_text(ROUTER_GO)
|
||
routes: list[tuple[str, str]] = []
|
||
|
||
# /api/admin 登录/鉴权/登出
|
||
for m in re.finditer(r'api\.(GET|POST|PUT|DELETE)\("(/admin(?:/[^"]*)?)",\s*handler\.[A-Za-z0-9_]+', text):
|
||
routes.append((m.group(1), f"/api{m.group(2)}"))
|
||
|
||
# /api/admin 组
|
||
for m in re.finditer(r'admin\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text):
|
||
routes.append((m.group(1), f"/api/admin{m.group(2)}"))
|
||
|
||
# /api/db 组
|
||
for m in re.finditer(r'db\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text):
|
||
routes.append((m.group(1), f"/api/db{m.group(2)}"))
|
||
|
||
# 去重
|
||
seen = set()
|
||
out = []
|
||
for method, p in routes:
|
||
if (method, p) in seen:
|
||
continue
|
||
seen.add((method, p))
|
||
out.append((method, p))
|
||
return out
|
||
|
||
|
||
def replace_path_params(path: str) -> str:
|
||
return path.replace(":id", "1")
|
||
|
||
|
||
def main() -> None:
|
||
import os
|
||
|
||
api_base = (os.environ.get("SOUL_API_BASE") or "http://localhost:8080").rstrip("/")
|
||
session = requests.Session()
|
||
session.verify = False # 如为 https 自签证书也可探测
|
||
|
||
routes = extract_routes()
|
||
print(f"Found routes: {len(routes)}")
|
||
|
||
failures: list[Check] = []
|
||
unexpected: list[Check] = []
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
# 先验证登录接口是否通(只对 /api/admin POST 登录做一次带凭证的检查)
|
||
admin_username = os.environ.get("SOUL_ADMIN_USERNAME", "admin")
|
||
admin_password = os.environ.get("SOUL_ADMIN_PASSWORD", "admin123")
|
||
login_url = f"{api_base}/api/admin"
|
||
r_login = session.post(
|
||
login_url,
|
||
json={"username": admin_username, "password": admin_password},
|
||
headers=headers,
|
||
timeout=10,
|
||
)
|
||
try:
|
||
login_data = r_login.json()
|
||
except Exception:
|
||
login_data = None
|
||
if r_login.status_code != 200 or not (login_data and login_data.get("success") is True and login_data.get("token")):
|
||
failures.append(Check("POST", "/api/admin", r_login.status_code, (r_login.text or "")[:200]))
|
||
print("LOGIN_CHECK_FAILED,后续路由鉴权探测可能不准确。")
|
||
|
||
for method, path_template in routes:
|
||
path = replace_path_params(path_template)
|
||
url = f"{api_base}{path}"
|
||
|
||
# 仅对登录接口放行;其他都不带 token,避免触发写操作
|
||
json_payload = None
|
||
if path == "/api/admin" and method == "POST":
|
||
# 已在上面验证登录;这里跳过
|
||
continue
|
||
|
||
if method in ("POST", "PUT"):
|
||
# 发空 body,通常也会被 AdminAuth 在更早阶段拦截
|
||
json_payload = {}
|
||
|
||
try:
|
||
resp = session.request(method, url, headers=headers, json=json_payload, timeout=10)
|
||
status = resp.status_code
|
||
preview = (resp.text or "")[:200].replace("\n", " ")
|
||
except Exception as e:
|
||
failures.append(Check(method, path, 0, f"EXC: {e}"))
|
||
continue
|
||
|
||
# 非登录接口:预期 AdminAuth 拦截 => 401 或 403
|
||
if status not in (401, 403):
|
||
unexpected.append(Check(method, path, status, preview))
|
||
|
||
print("\n=== AUTHLESS_SMOKE_RESULT ===")
|
||
print("Failures(0/404/500 等异常/网络异常):", len(failures))
|
||
for it in failures[:30]:
|
||
print(f"- {it.method} {it.path} -> {it.status}, preview={it.preview}")
|
||
|
||
print("Unexpected (非 401/403):", len(unexpected))
|
||
for it in unexpected[:30]:
|
||
print(f"- {it.method} {it.path} -> {it.status}, preview={it.preview}")
|
||
|
||
if len(unexpected) > 30:
|
||
print("... truncated")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|