Files
soul-yongping/scripts/test/web/admin_routes_smoke_authless.py

135 lines
4.2 KiB
Python
Raw Normal View History

import re
from dataclasses import dataclass
from pathlib import Path
import requests
PROJECT_ROOT = Path(__file__).resolve().parents[3]
ROUTER_GO = PROJECT_ROOT / "soul-api" / "internal" / "router" / "router.go"
@dataclass
class Check:
method: str
path: str
status: int
preview: str
def _read_text(path: Path) -> str:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def extract_routes() -> list[tuple[str, str]]:
"""
返回 [(method, full_path_template), ...]
full_path_template 保留 :id 占位符
"""
text = _read_text(ROUTER_GO)
routes: list[tuple[str, str]] = []
# /api/admin 登录/鉴权/登出
for m in re.finditer(r'api\.(GET|POST|PUT|DELETE)\("(/admin(?:/[^"]*)?)",\s*handler\.[A-Za-z0-9_]+', text):
routes.append((m.group(1), f"/api{m.group(2)}"))
# /api/admin 组
for m in re.finditer(r'admin\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text):
routes.append((m.group(1), f"/api/admin{m.group(2)}"))
# /api/db 组
for m in re.finditer(r'db\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text):
routes.append((m.group(1), f"/api/db{m.group(2)}"))
# 去重
seen = set()
out = []
for method, p in routes:
if (method, p) in seen:
continue
seen.add((method, p))
out.append((method, p))
return out
def replace_path_params(path: str) -> str:
return path.replace(":id", "1")
def main() -> None:
import os
api_base = (os.environ.get("SOUL_API_BASE") or "http://localhost:8080").rstrip("/")
session = requests.Session()
session.verify = False # 如为 https 自签证书也可探测
routes = extract_routes()
print(f"Found routes: {len(routes)}")
failures: list[Check] = []
unexpected: list[Check] = []
headers = {"Content-Type": "application/json"}
# 先验证登录接口是否通(只对 /api/admin POST 登录做一次带凭证的检查)
admin_username = os.environ.get("SOUL_ADMIN_USERNAME", "admin")
admin_password = os.environ.get("SOUL_ADMIN_PASSWORD", "admin123")
login_url = f"{api_base}/api/admin"
r_login = session.post(
login_url,
json={"username": admin_username, "password": admin_password},
headers=headers,
timeout=10,
)
try:
login_data = r_login.json()
except Exception:
login_data = None
if r_login.status_code != 200 or not (login_data and login_data.get("success") is True and login_data.get("token")):
failures.append(Check("POST", "/api/admin", r_login.status_code, (r_login.text or "")[:200]))
print("LOGIN_CHECK_FAILED后续路由鉴权探测可能不准确。")
for method, path_template in routes:
path = replace_path_params(path_template)
url = f"{api_base}{path}"
# 仅对登录接口放行;其他都不带 token避免触发写操作
json_payload = None
if path == "/api/admin" and method == "POST":
# 已在上面验证登录;这里跳过
continue
if method in ("POST", "PUT"):
# 发空 body通常也会被 AdminAuth 在更早阶段拦截
json_payload = {}
try:
resp = session.request(method, url, headers=headers, json=json_payload, timeout=10)
status = resp.status_code
preview = (resp.text or "")[:200].replace("\n", " ")
except Exception as e:
failures.append(Check(method, path, 0, f"EXC: {e}"))
continue
# 非登录接口:预期 AdminAuth 拦截 => 401 或 403
if status not in (401, 403):
unexpected.append(Check(method, path, status, preview))
print("\n=== AUTHLESS_SMOKE_RESULT ===")
print("Failures(0/404/500 等异常/网络异常):", len(failures))
for it in failures[:30]:
print(f"- {it.method} {it.path} -> {it.status}, preview={it.preview}")
print("Unexpected (非 401/403):", len(unexpected))
for it in unexpected[:30]:
print(f"- {it.method} {it.path} -> {it.status}, preview={it.preview}")
if len(unexpected) > 30:
print("... truncated")
if __name__ == "__main__":
main()