Files
soul-yongping/scripts/test/miniapp/test_article_preview_speed.py

235 lines
8.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章阅读与界面预览 GET 接口响应速度测试
测试范围
- 界面预览configbook/partsbook/all-chaptersbook/chapters-by-part
- 文章阅读book/chapter/:idbook/chapter/by-mid/:mid
用法
SOUL_TEST_ENV=soulapi python scripts/test/miniapp/test_article_preview_speed.py
SOUL_TEST_ENV=soulapi python -m scripts.test.miniapp.test_article_preview_speed
产出控制台报表 + 开发文档/测试报告-文章阅读与界面预览响应速度-YYYYMMDD.md
"""
import json
import sys
import time
from pathlib import Path
import requests
# 加载测试配置
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from config import API_BASE, ENV_LABEL, get_env_banner
# 每接口请求次数(取平均)
ROUNDS = 5
TIMEOUT = 30
def measure_get(url: str, desc: str) -> dict:
"""对 GET 请求测速,返回 {ok, status_code, times_ms, avg_ms, min_ms, max_ms, error}"""
times_ms = []
last_error = None
last_status = None
for _ in range(ROUNDS):
t0 = time.perf_counter()
try:
r = requests.get(url, timeout=TIMEOUT)
last_status = r.status_code
elapsed = (time.perf_counter() - t0) * 1000
times_ms.append(elapsed)
if r.status_code != 200:
last_error = f"HTTP {r.status_code}"
except requests.RequestException as e:
last_error = str(e)
times_ms.append(-1)
if not times_ms:
return {"ok": False, "error": last_error or "无响应", "status_code": last_status}
valid = [t for t in times_ms if t >= 0]
return {
"ok": len(valid) == ROUNDS and (last_status or 200) == 200,
"status_code": last_status,
"times_ms": times_ms,
"avg_ms": sum(valid) / len(valid) if valid else 0,
"min_ms": min(valid) if valid else 0,
"max_ms": max(valid) if valid else 0,
"error": last_error,
}
def main():
print(get_env_banner())
base = API_BASE.rstrip("/")
# 1. 先拉取 parts 和 all-chapters获取 partId、id、mid
parts_url = f"{base}/api/miniprogram/book/parts"
all_chapters_url = f"{base}/api/miniprogram/book/all-chapters"
parts_data = None
all_chapters_data = None
try:
r = requests.get(parts_url, timeout=TIMEOUT)
if r.status_code == 200:
parts_data = r.json()
except Exception:
pass
try:
r = requests.get(all_chapters_url, timeout=TIMEOUT)
if r.status_code == 200:
all_chapters_data = r.json()
except Exception:
pass
part_id = None
chapter_id = None
chapter_mid = None
if parts_data and parts_data.get("success"):
parts = parts_data.get("parts") or []
fixed = parts_data.get("fixedSections") or []
if parts:
part_id = parts[0].get("id")
if fixed:
chapter_mid = fixed[0].get("mid")
chapter_id = fixed[0].get("id")
if (not chapter_id or not chapter_mid) and all_chapters_data and all_chapters_data.get("success"):
arr = all_chapters_data.get("data") or all_chapters_data.get("chapters") or []
if arr:
first = arr[0] if isinstance(arr[0], dict) else {}
chapter_id = chapter_id or first.get("id")
chapter_mid = chapter_mid or first.get("mid")
if not part_id and parts_data and parts_data.get("success"):
parts = parts_data.get("parts") or []
if parts:
part_id = parts[0].get("id")
# 2. 定义测试用例(仅 GET
cases = [
("界面预览-配置", f"{base}/api/miniprogram/config", "GET /api/miniprogram/config"),
("界面预览-目录", f"{base}/api/miniprogram/book/parts", "GET /api/miniprogram/book/parts"),
("界面预览-全书章节", f"{base}/api/miniprogram/book/all-chapters", "GET /api/miniprogram/book/all-chapters"),
]
if part_id:
cases.append(
(
"界面预览-篇章内章节",
f"{base}/api/miniprogram/book/chapters-by-part?partId={part_id}",
f"GET /api/miniprogram/book/chapters-by-part?partId={part_id}",
)
)
if chapter_id:
cases.append(
(
"文章阅读-按id",
f"{base}/api/miniprogram/book/chapter/{chapter_id}",
f"GET /api/miniprogram/book/chapter/:id",
)
)
if chapter_mid:
cases.append(
(
"文章阅读-按mid",
f"{base}/api/miniprogram/book/chapter/by-mid/{chapter_mid}",
f"GET /api/miniprogram/book/chapter/by-mid/:mid",
)
)
# 3. 执行测速
results = []
for name, url, api_desc in cases:
print(f"\n测速: {name} ({api_desc})")
res = measure_get(url, name)
res["name"] = name
res["api"] = api_desc
res["url"] = url
results.append(res)
if res["ok"]:
print(f" [OK] avg={res['avg_ms']:.0f}ms (min={res['min_ms']:.0f}, max={res['max_ms']:.0f})")
else:
print(f" [FAIL] {res.get('error', res.get('status_code', '?'))}")
# 4. 生成报表
from datetime import datetime
date_str = datetime.now().strftime("%Y-%m-%d %H:%M")
date_file = datetime.now().strftime("%Y%m%d")
lines = [
"# 文章阅读与界面预览 GET 接口响应速度测试报告",
"",
f"**测试时间**: {date_str}",
f"**测试环境**: {ENV_LABEL} ({API_BASE})",
f"**每接口请求次数**: {ROUNDS}",
"",
"## 一、测试范围",
"",
"| 分类 | 接口 | 说明 |",
"|------|------|------|",
"| 界面预览 | GET /api/miniprogram/config | 配置(价格、功能开关等) |",
"| 界面预览 | GET /api/miniprogram/book/parts | 目录-篇章列表 |",
"| 界面预览 | GET /api/miniprogram/book/all-chapters | 全书章节列表 |",
"| 界面预览 | GET /api/miniprogram/book/chapters-by-part | 篇章内章节列表 |",
"| 文章阅读 | GET /api/miniprogram/book/chapter/:id | 按业务 id 获取章节内容 |",
"| 文章阅读 | GET /api/miniprogram/book/chapter/by-mid/:mid | 按 mid 获取章节内容 |",
"",
"## 二、响应速度结果",
"",
"| 接口 | 状态 | 平均(ms) | 最小(ms) | 最大(ms) |",
"|------|------|----------|----------|----------|",
]
for r in results:
status = "OK" if r["ok"] else "FAIL"
avg = f"{r['avg_ms']:.0f}" if r["ok"] else "-"
min_ms = f"{r['min_ms']:.0f}" if r["ok"] else "-"
max_ms = f"{r['max_ms']:.0f}" if r["ok"] else "-"
if not r["ok"]:
err = r.get("error", "") or f"HTTP {r.get('status_code', '?')}"
avg = err[:20] if err else "-"
lines.append(f"| {r['api']} | {status} | {avg} | {min_ms} | {max_ms} |")
# 汇总
ok_count = sum(1 for r in results if r["ok"])
total_count = len(results)
if ok_count == total_count:
avg_all = sum(r["avg_ms"] for r in results) / total_count
lines.extend([
"",
"## 三、汇总",
"",
f"- 通过: {ok_count}/{total_count}",
f"- 全部接口平均响应: {avg_all:.0f}ms",
"",
])
else:
lines.extend([
"",
"## 三、汇总",
"",
f"- 通过: {ok_count}/{total_count}",
f"- 失败: {total_count - ok_count} 个接口",
"",
])
report_content = "\n".join(lines)
# 5. 输出到控制台
print("\n" + "=" * 60)
print(report_content)
print("=" * 60)
# 6. 写入文件(项目根/开发文档)
report_dir = Path(__file__).resolve().parent.parent.parent.parent / "开发文档"
report_dir.mkdir(parents=True, exist_ok=True)
report_path = report_dir / f"测试报告-文章阅读与界面预览响应速度-{date_file}.md"
report_path.write_text(report_content, encoding="utf-8")
print(f"\n报表已保存: {report_path}")
return 0 if ok_count == total_count else 1
if __name__ == "__main__":
sys.exit(main())