import json import re from dataclasses import dataclass from typing import Any import requests ROUTER_GO = r"e:\\Gongsi\\Mycontent\\soul-api\\internal\\router\\router.go" @dataclass class Route: group: str # "admin" | "db" | "root" method: str path: str # path within the group, e.g. "/chapters" or "/admin" full_path: str # full path appended to API_BASE_URL def _read_text(path: str) -> str: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def extract_admin_and_db_routes() -> list[tuple[str, str]]: """ 返回 [(method, full_path_template), ...] full_path_template 已包含 /api/admin 或 /api/db 前缀,保留 :id 占位符。 """ text = _read_text(ROUTER_GO) routes: list[tuple[str, str]] = [] # 1) /api/admin 登录/鉴权/登出(不是 admin group 内) # api.GET("/admin", ...) / api.POST("/admin", ...) / api.POST("/admin/logout", ...) for m in re.finditer(r'api\.(GET|POST|PUT|DELETE)\("(/admin(?:/[^"]*)?)",\s*handler\.[A-Za-z0-9_]+', text): routes.append((m.group(1), f"/api{m.group(2)}")) # 2) admin group:api.Group("/admin") + admin.(GET|POST|PUT|DELETE)("/xxx", ...) for m in re.finditer(r'admin\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text): routes.append((m.group(1), f"/api/admin{m.group(2)}")) # 3) db group:api.Group("/db") + db.(GET|POST|PUT|DELETE)("/xxx", ...) for m in re.finditer(r'db\.(GET|POST|PUT|DELETE)\("(/[^"]*)",\s*handler\.[A-Za-z0-9_]+', text): routes.append((m.group(1), f"/api/db{m.group(2)}")) # 去重(同一 handler 可能存在重复注册) seen: set[tuple[str, str]] = set() out: list[tuple[str, str]] = [] for method, p in routes: k = (method, p) if k in seen: continue seen.add(k) out.append((method, p)) return out def replace_path_params(path: str) -> str: # 仅用于 smoke:把 :id 替换成一个固定占位 return path.replace(":id", "1") def request_json( session: requests.Session, method: str, url: str, headers: dict[str, str], payload: Any | None = None, raw_body: str | None = None, ) -> tuple[int, dict[str, Any] | None, str]: try: if raw_body is not None: resp = session.request(method, url, headers=headers, data=raw_body, timeout=10) elif payload is None: resp = session.request(method, url, headers=headers, timeout=10) else: resp = session.request(method, url, headers=headers, data=json.dumps(payload), timeout=10) text = resp.text or "" try: data = resp.json() except Exception: data = None return resp.status_code, data, text[:300] except Exception as e: return 0, None, f"EXC: {e}" def main() -> None: api_base = None # 优先使用本地默认;需要对接测试环境时在 PowerShell 设置 SOUL_API_BASE import os api_base = (os.environ.get("SOUL_API_BASE") or "").rstrip("/") if not api_base: # 默认本机 api_base = "http://localhost:8080" admin_username = os.environ.get("SOUL_ADMIN_USERNAME", "admin") admin_password = os.environ.get("SOUL_ADMIN_PASSWORD", "admin123") session = requests.Session() # 本 smoke 默认不验证 TLS(如果你用的是 https 且是自签证书,能跑通测试) session.verify = False # 登录拿 token login_url = f"{api_base}/api/admin" r = session.post(login_url, json={"username": admin_username, "password": admin_password}, timeout=10) try: login_data = r.json() except Exception: login_data = None if r.status_code != 200 or not (login_data and login_data.get("success") is True and login_data.get("token")): print("LOGIN_FAILED", r.status_code, r.text[:200]) return token = login_data["token"] headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} routes = extract_admin_and_db_routes() print(f"Found routes: {len(routes)}") failures: list[dict[str, Any]] = [] unexpected_success: list[dict[str, Any]] = [] for method, path_template in routes: path = replace_path_params(path_template) url = f"{api_base}{path}" payload = None raw_body = None if method in ("POST", "PUT", "DELETE"): # 安全模式:发送明显非法 JSON,尽量触发 ShouldBindJSON 失败,避免真实写入。 payload = None raw_body = "{invalid_json" status, data, preview = request_json( session, method, url, headers, payload=payload, raw_body=raw_body ) ok = status not in (404, 500) and status != 0 # POST/PUT/DELETE 在安全模式下不应返回 success=true if method in ("POST", "PUT", "DELETE") and data and data.get("success") is True: unexpected_success.append( {"method": method, "path": path, "status": status, "data": data, "preview": preview} ) if not ok: failures.append({"method": method, "path": path, "status": status, "data": data, "preview": preview}) print("\n=== SMOKE_RESULT ===") print("Failures(404/500/EXC):", len(failures)) if failures: for it in failures: print(f"- {it['method']} {it['path']} -> {it['status']}, preview={it.get('preview')}") print("\nUnexpected success on write calls:", len(unexpected_success)) if unexpected_success: for it in unexpected_success: print(f"- {it['method']} {it['path']} -> success=true (status {it['status']}, preview={it.get('preview')})") if __name__ == "__main__": main()