🔄 卡若AI 同步 2026-03-15 19:00 | 更新:卡木、总索引与入口、运营中枢工作台 | 排除 >20MB: 11 个

This commit is contained in:
2026-03-15 19:00:52 +08:00
parent a50185f474
commit 8da96520d2
14 changed files with 1609 additions and 3 deletions

View File

@@ -0,0 +1,115 @@
---
name: 全网API自动注册
description: 自动注册全网各类 AI/开发 API 免费账号,提取 API Key/Token 并统一管理
triggers: API注册、自动注册、批量注册、API Key、注册账号、免费API、API池、key池、自动开号
owner: 木根
group: 木(卡木)
version: "1.0"
updated: "2026-03-15"
---
# 全网API自动注册
## 能做什么Capabilities
- 自动注册 OpenAI / Cursor / Gemini / Groq / DeepSeek / Mistral / Together AI 等平台的免费账号
- 支持临时邮箱tempmail.plus / Cloudflare Worker catch-all / 自有域名 IMAP
- 支持纯 API 注册(高速)和浏览器自动化注册(通用性强)两种模式
- 自动获取邮箱验证码IMAP 轮询 / API 轮询 / 正则提取 6 位 OTP
- 自动处理 CAPTCHACloudflare Turnstile / hCaptcha checkbox 点击)
- Token / API Key 提取并存入本地 SQLite + JSON 双存储
- API Key 池管理与轮换(内置 FastAPI 管理接口)
- 并发批量注册(支持 1~50 并发)
## 怎么用Usage
触发词:`API注册``自动注册``批量注册``API Key``免费API``key池``自动开号`
### 快速开始
```bash
cd 卡若AI/03_卡木/木根_逆向分析/全网API自动注册/脚本
# 1. 安装依赖
pip install -r requirements.txt
# 2. 配置(复制模板并编辑)
cp config.example.yaml config.yaml
# 编辑 config.yaml填写邮箱域名、IMAP 信息等
# 3. 注册指定平台
python auto_register.py --provider openai --count 5
python auto_register.py --provider cursor --count 3
python auto_register.py --provider gemini --count 10
# 4. 查看已注册的 Key 池
python auto_register.py --list
# 5. 启动 Key 管理 API 服务
python auto_register.py --serve --port 8899
```
### 支持的平台
| 平台 | 模式 | 免费额度 | 注册方式 |
|:---|:---|:---|:---|
| OpenAI (Codex) | API 模式 | $5 免费额度 | OAuth PKCE + OTP |
| Cursor | 浏览器模式 | 2 周免费试用 | 表单 + Turnstile + OTP |
| Google Gemini | API 模式 | 15 RPM/project | Google Cloud Console API Key |
| Groq | 浏览器模式 | 免费 tier | 注册 + API Key 生成 |
| DeepSeek | API 模式 | 免费额度 | 注册 + API Key |
| Mistral | API 模式 | 免费 tier | 注册 + API Key |
| Together AI | API 模式 | $5 免费额度 | 注册 + API Key |
| Cohere | API 模式 | 免费 tier | 注册 + API Key |
## 执行步骤Steps
### 新增平台支持
1.`providers/` 目录创建 `平台名.py`,继承 `BaseProvider` 基类
2. 实现 `register()` 方法:注册流程
3. 实现 `extract_key()` 方法:提取 API Key/Token
4.`config.yaml` 添加平台配置
5. 运行 `python auto_register.py --provider 平台名 --count N`
### 注册流程(通用框架)
```
生成临时邮箱 → 打开注册页面/调用注册API
→ 填写表单/提交邮箱
→ 处理 CAPTCHA如有
→ 等待并提取邮箱验证码
→ 提交验证码
→ 创建账号(填写资料)
→ 提取 API Key / Token
→ 存入本地数据库
→ 下一个...
```
## 相关文件Files
- 主程序:`脚本/auto_register.py`
- 配置模板:`脚本/config.example.yaml`
- 依赖:`脚本/requirements.txt`
- Provider 基类:`providers/base_provider.py`
- OpenAI Provider`providers/openai_provider.py`
- Cursor Provider`providers/cursor_provider.py`
- Gemini Provider`providers/gemini_provider.py`
- 邮箱服务:`脚本/email_service.py`
- Key 管理 API`脚本/key_manager_api.py`
## 依赖Dependencies
- 前置技能M02 网站逆向分析(分析注册流程时联动)
- 外部工具Python 3.10+、Chrome/Chromium浏览器模式需要
- Python 包curl_cffi、DrissionPage、aiosqlite、fastapi、uvicorn、pyyaml
## 参考项目References
| 项目 | 地址 | Stars | 核心技术 |
|:---|:---|:---|:---|
| openai-auto-register | github.com/Ethan-W20/openai-auto-register | 184⭐ | Go/Python 纯 API + IMAP |
| gpt-auto-register | github.com/7836246/gpt-auto-register | 182⭐ | Selenium + Cloudflare Worker |
| cursor-auto-register | github.com/ddCat-main/cursor-auto-register | - | DrissionPage + FastAPI |
| Hydra-gemini | github.com/LikithMeruvu/Hydra-gemini | - | Gemini Key 池聚合 |
| openai-gemini-api-key-rotator | github.com/p32929/openai-gemini-api-key-rotator | - | Node.js Key 轮换代理 |

View File

@@ -0,0 +1,10 @@
from .base_provider import (
BaseProvider,
AccountResult,
AccountStorage,
EmailService,
random_name,
random_password,
random_birthday,
create_pkce_pair,
)

View File

@@ -0,0 +1,343 @@
"""
全网API自动注册 — Provider 基类
所有平台的注册逻辑都继承此基类,实现统一的注册接口。
"""
import os
import re
import json
import time
import random
import string
import hashlib
import secrets
import sqlite3
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from faker import Faker
log = logging.getLogger("auto_register")
fake = Faker(["en_US", "zh_CN", "ja_JP"])
@dataclass
class AccountResult:
"""注册结果统一数据结构"""
provider: str
email: str
password: str = ""
api_key: str = ""
access_token: str = ""
refresh_token: str = ""
account_id: str = ""
name: str = ""
extra: dict = field(default_factory=dict)
registered_at: str = ""
status: str = "active"
def __post_init__(self):
if not self.registered_at:
self.registered_at = datetime.now(timezone.utc).isoformat()
class AccountStorage:
"""SQLite + JSON 双存储"""
def __init__(self, db_path: str = "accounts.db", json_dir: str = "tokens/"):
self.db_path = db_path
self.json_dir = Path(json_dir)
self.json_dir.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider TEXT NOT NULL,
email TEXT NOT NULL,
password TEXT DEFAULT '',
api_key TEXT DEFAULT '',
access_token TEXT DEFAULT '',
refresh_token TEXT DEFAULT '',
account_id TEXT DEFAULT '',
name TEXT DEFAULT '',
extra TEXT DEFAULT '{}',
registered_at TEXT NOT NULL,
status TEXT DEFAULT 'active',
UNIQUE(provider, email)
)
""")
conn.commit()
def save(self, result: AccountResult):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO accounts
(provider, email, password, api_key, access_token, refresh_token,
account_id, name, extra, registered_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
result.provider, result.email, result.password,
result.api_key, result.access_token, result.refresh_token,
result.account_id, result.name, json.dumps(result.extra),
result.registered_at, result.status,
))
conn.commit()
json_file = self.json_dir / f"{result.provider}_{result.email.replace('@', '_at_')}.json"
json_file.write_text(json.dumps(asdict(result), indent=2, ensure_ascii=False))
log.info(f"[存储] {result.provider} / {result.email} → DB + {json_file.name}")
def list_accounts(self, provider: str = None) -> list[dict]:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
if provider:
rows = conn.execute(
"SELECT * FROM accounts WHERE provider = ? ORDER BY id DESC", (provider,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM accounts ORDER BY id DESC").fetchall()
return [dict(r) for r in rows]
def get_random_key(self, provider: str) -> Optional[str]:
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT api_key FROM accounts WHERE provider = ? AND status = 'active' AND api_key != '' ORDER BY RANDOM() LIMIT 1",
(provider,)
).fetchone()
return row[0] if row else None
def count(self, provider: str = None) -> int:
with sqlite3.connect(self.db_path) as conn:
if provider:
return conn.execute(
"SELECT COUNT(*) FROM accounts WHERE provider = ?", (provider,)
).fetchone()[0]
return conn.execute("SELECT COUNT(*) FROM accounts").fetchone()[0]
class EmailService:
"""临时邮箱服务(支持 tempmail.plus / Cloudflare Worker / 域名 IMAP"""
RE_OTP = re.compile(r"(?<!\d)(\d{6})(?!\d)")
RE_CODE_PRECISE = re.compile(r"(?:code\s+is|verification code|验证码)[:\s]*(\d{6})", re.IGNORECASE)
def __init__(self, config: dict):
self.config = config
self.email_type = config.get("type", "tempmail")
def generate_email(self) -> tuple[str, dict]:
"""生成临时邮箱,返回 (email_address, context_for_receiving)"""
if self.email_type == "tempmail":
return self._gen_tempmail()
elif self.email_type == "cloudflare_worker":
return self._gen_cf_worker()
elif self.email_type == "domain_imap":
return self._gen_domain_imap()
raise ValueError(f"不支持的邮箱类型: {self.email_type}")
def _gen_tempmail(self) -> tuple[str, dict]:
cfg = self.config.get("tempmail", {})
prefix = cfg.get("username", "test")
random_suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
email = f"{prefix}{random_suffix}@tempmail.plus"
return email, {"type": "tempmail", "pin": cfg.get("pin", "")}
def _gen_cf_worker(self) -> tuple[str, dict]:
import httpx
cfg = self.config.get("cloudflare_worker", {})
prefix = "".join(random.choices(string.ascii_lowercase + string.digits, k=10))
domain = cfg["domain"]
url = cfg["url"]
resp = httpx.post(f"{url}/api/new_address", json={"name": prefix}, timeout=15)
data = resp.json()
return data.get("address", f"{prefix}@{domain}"), {
"type": "cf_worker", "jwt": data.get("jwt", ""), "url": url
}
def _gen_domain_imap(self) -> tuple[str, dict]:
cfg = self.config.get("domain_imap", {})
domains = cfg.get("domains", ["example.com"])
domain = random.choice(domains)
prefix = "".join(random.choices(string.ascii_lowercase + string.digits, k=12))
email = f"{prefix}@{domain}"
return email, {"type": "domain_imap", "email": email}
def wait_for_code(self, email: str, context: dict, timeout: int = 120) -> Optional[str]:
"""等待并提取 6 位 OTP 验证码"""
email_type = context.get("type", "tempmail")
if email_type == "tempmail":
return self._poll_tempmail(email, context, timeout)
elif email_type == "cf_worker":
return self._poll_cf_worker(email, context, timeout)
elif email_type == "domain_imap":
return self._poll_imap(email, context, timeout)
return None
def _poll_tempmail(self, email: str, context: dict, timeout: int) -> Optional[str]:
import httpx
addr = email.split("@")[0]
pin = context.get("pin", "")
start = time.time()
while time.time() - start < timeout:
try:
url = f"https://tempmail.plus/api/mails?email={addr}&limit=20&epin={pin}"
resp = httpx.get(url, timeout=10)
mails = resp.json().get("mail_list", [])
for mail in mails:
code = self._extract_code(mail.get("subject", ""), mail.get("text", ""))
if code:
return code
except Exception as e:
log.warning(f"[邮箱] tempmail 轮询异常: {e}")
time.sleep(3)
return None
def _poll_cf_worker(self, email: str, context: dict, timeout: int) -> Optional[str]:
import httpx
jwt = context.get("jwt", "")
url = context.get("url", "")
start = time.time()
while time.time() - start < timeout:
try:
resp = httpx.get(
f"{url}/api/mails?limit=20&offset=0",
headers={"Authorization": f"Bearer {jwt}"},
timeout=10,
)
mails = resp.json().get("results", [])
for mail in mails:
raw = mail.get("raw", mail.get("text", ""))
code = self._extract_code(mail.get("subject", ""), raw)
if code:
return code
except Exception as e:
log.warning(f"[邮箱] CF Worker 轮询异常: {e}")
time.sleep(3)
return None
def _poll_imap(self, email: str, context: dict, timeout: int) -> Optional[str]:
import imaplib
import email as email_lib
cfg = self.config.get("domain_imap", {})
start = time.time()
while time.time() - start < timeout:
try:
imap = imaplib.IMAP4_SSL(cfg["imap_host"], cfg.get("imap_port", 993))
imap.login(cfg["imap_user"], cfg["imap_pass"])
imap.select("INBOX")
_, msg_nums = imap.search(None, f'(TO "{email}" UNSEEN)')
for num in msg_nums[0].split():
_, msg_data = imap.fetch(num, "(RFC822)")
msg = email_lib.message_from_bytes(msg_data[0][1])
subject = str(msg.get("Subject", ""))
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode(errors="ignore")
break
else:
body = msg.get_payload(decode=True).decode(errors="ignore")
code = self._extract_code(subject, body)
if code:
imap.logout()
return code
imap.logout()
except Exception as e:
log.warning(f"[邮箱] IMAP 轮询异常: {e}")
time.sleep(3)
return None
def _extract_code(self, subject: str, body: str) -> Optional[str]:
"""从邮件主题和正文中提取 6 位 OTP"""
precise = self.RE_CODE_PRECISE.search(subject)
if precise:
return precise.group(1)
precise = self.RE_CODE_PRECISE.search(body)
if precise:
return precise.group(1)
match = self.RE_OTP.search(subject)
if match:
return match.group(1)
return None
def random_name() -> tuple[str, str]:
first = fake.first_name()
last = fake.last_name()
return first, last
def random_password(length: int = 16) -> str:
chars = string.ascii_letters + string.digits + "!@#$%"
pwd = [
random.choice(string.ascii_uppercase),
random.choice(string.ascii_lowercase),
random.choice(string.digits),
random.choice("!@#$%"),
]
pwd += random.choices(chars, k=length - 4)
random.shuffle(pwd)
return "".join(pwd)
def random_birthday(min_age: int = 20, max_age: int = 40) -> str:
year = datetime.now().year - random.randint(min_age, max_age)
month = random.randint(1, 12)
day = random.randint(1, 28)
return f"{year}-{month:02d}-{day:02d}"
def create_pkce_pair() -> tuple[str, str]:
"""PKCE code_verifier + code_challenge (S256)"""
import base64
verifier = secrets.token_urlsafe(48)
digest = hashlib.sha256(verifier.encode("ascii")).digest()
challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
return verifier, challenge
class BaseProvider(ABC):
"""所有平台 Provider 的抽象基类"""
PROVIDER_NAME: str = "base"
def __init__(self, config: dict, email_service: EmailService, storage: AccountStorage):
self.config = config
self.email_service = email_service
self.storage = storage
@abstractmethod
def register(self) -> Optional[AccountResult]:
"""执行一次注册,返回 AccountResult 或 None"""
...
def register_batch(self, count: int, max_workers: int = 5) -> list[AccountResult]:
"""批量注册"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [pool.submit(self._safe_register) for _ in range(count)]
for f in as_completed(futures):
r = f.result()
if r:
results.append(r)
log.info(f"[{self.PROVIDER_NAME}] 批量注册完成: {len(results)}/{count} 成功")
return results
def _safe_register(self) -> Optional[AccountResult]:
try:
result = self.register()
if result:
self.storage.save(result)
return result
except Exception as e:
log.error(f"[{self.PROVIDER_NAME}] 注册异常: {e}")
return None

View File

@@ -0,0 +1,187 @@
"""
Cursor 自动注册 Provider
浏览器模式 — 使用 DrissionPage 自动填表 + Turnstile 绕过。
参考: github.com/ddCat-main/cursor-auto-register
"""
import time
import random
import logging
from typing import Optional
from .base_provider import (
BaseProvider, AccountResult, EmailService, AccountStorage,
random_name, random_password,
)
log = logging.getLogger("auto_register")
SIGNUP_URL = "https://authenticator.cursor.sh/sign-up"
SETTINGS_URL = "https://www.cursor.com/settings"
def _handle_turnstile(tab):
"""处理 Cloudflare Turnstile CAPTCHA"""
try:
cf_el = tab.ele("@id=cf-turnstile", timeout=3)
if cf_el:
shadow = cf_el.child().shadow_root
iframe = shadow.ele("tag:iframe")
body = iframe.ele("tag:body")
input_el = body.sr("tag:input")
if input_el:
input_el.click()
log.info(" [Turnstile] 已点击验证")
time.sleep(2)
except Exception:
pass
def _type_slowly(tab, selector: str, text: str, delay: float = 0.05):
"""模拟真人逐字输入"""
el = tab.ele(selector, timeout=10)
if el:
el.click()
time.sleep(0.2)
for char in text:
el.input(char)
time.sleep(delay + random.uniform(0, 0.03))
class CursorProvider(BaseProvider):
PROVIDER_NAME = "cursor"
def __init__(self, config: dict, email_service: EmailService, storage: AccountStorage):
super().__init__(config, email_service, storage)
self.provider_config = config.get("providers", {}).get("cursor", {})
self.browser_config = config.get("browser", {})
def _create_browser(self):
from DrissionPage import ChromiumPage, ChromiumOptions
co = ChromiumOptions()
if self.browser_config.get("headless", True):
co.set_argument("--headless=new")
co.set_argument("--disable-blink-features=AutomationControlled")
co.set_argument("--disable-features=AutomationControlled")
co.set_argument("--no-sandbox")
co.set_argument("--disable-gpu")
co.set_argument("--window-size=1920,1080")
co.set_pref("webgl.vendor", "NVIDIA Corporation")
co.set_pref("webgl.renderer", "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060)")
if self.browser_config.get("chrome_path"):
co.set_browser_path(self.browser_config["chrome_path"])
proxy = self.config.get("proxy", {})
if proxy.get("enabled"):
co.set_proxy(f"{proxy['type']}://{proxy['host']}:{proxy['port']}")
return ChromiumPage(co)
def register(self) -> Optional[AccountResult]:
email, email_ctx = self.email_service.generate_email()
password = random_password()
first, last = random_name()
log.info(f"[Cursor] 开始注册: {email}")
browser = None
try:
browser = self._create_browser()
tab = browser.latest_tab
# Step 1: 打开注册页面
signup_url = self.provider_config.get("signup_url", SIGNUP_URL)
tab.get(signup_url)
time.sleep(2)
tab.run_js("try { turnstile.reset() } catch(e) {}")
log.info(f" [1/6] 打开注册页面")
# Step 2: 填写基本信息
_type_slowly(tab, '@name=first_name', first)
_type_slowly(tab, '@name=last_name', last)
_type_slowly(tab, '@name=email', email)
time.sleep(0.5)
submit = tab.ele("@type=submit", timeout=5)
if submit:
submit.click()
log.info(f" [2/6] 填写基本信息: {first} {last}")
time.sleep(2)
# Step 3: 处理 Turnstile
_handle_turnstile(tab)
log.info(f" [3/6] Turnstile 处理")
time.sleep(2)
# Step 4: 填写密码
pwd_el = tab.ele('@name=password', timeout=10)
if pwd_el:
_type_slowly(tab, '@name=password', password)
time.sleep(0.3)
submit = tab.ele("@type=submit", timeout=5)
if submit:
submit.click()
log.info(f" [4/6] 密码已填写")
time.sleep(2)
_handle_turnstile(tab)
# Step 5: 获取并输入验证码
otp_el = tab.ele("@data-index=0", timeout=15)
if otp_el:
code = self.email_service.wait_for_code(
email, email_ctx,
timeout=self.config.get("registration", {}).get("otp_timeout", 120),
)
if not code:
log.error(f" [5/6] 验证码获取超时")
return None
for i, digit in enumerate(code):
el = tab.ele(f"@data-index={i}", timeout=5)
if el:
el.input(digit)
time.sleep(0.1)
log.info(f" [5/6] 验证码已输入: {code}")
time.sleep(3)
else:
log.warning(f" [5/6] 未检测到验证码输入框,可能已自动跳过")
# Step 6: 提取 Token
settings_url = self.provider_config.get("settings_url", SETTINGS_URL)
tab.get(settings_url)
time.sleep(3)
token = ""
user = ""
for cookie in tab.cookies():
if cookie.get("name") == "WorkosCursorSessionToken":
value = cookie["value"]
parts = value.split("%3A%3A")
if len(parts) >= 2:
user = parts[0]
token = parts[1]
break
if token:
log.info(f" [6/6] Token 提取成功! user={user[:16]}...")
return AccountResult(
provider="cursor",
email=email,
password=password,
api_key="",
access_token=token,
refresh_token="",
account_id=user,
name=f"{first} {last}",
extra={"cookie_value": f"{user}%3A%3A{token}"},
)
else:
log.error(f" [6/6] Token 提取失败")
return None
except Exception as e:
log.error(f"[Cursor] 注册异常: {e}")
return None
finally:
if browser:
try:
browser.quit()
except Exception:
pass

View File

@@ -0,0 +1,56 @@
"""
Google Gemini API Key 自动注册 Provider
浏览器模式 — Google Cloud Console 创建项目 + 启用 Gemini API + 生成 Key。
"""
import time
import random
import logging
from typing import Optional
from .base_provider import (
BaseProvider, AccountResult, EmailService, AccountStorage,
random_name, random_password,
)
log = logging.getLogger("auto_register")
GOOGLE_SIGNUP_URL = "https://accounts.google.com/signup/v2/createaccount"
AI_STUDIO_URL = "https://aistudio.google.com/app/apikey"
class GeminiProvider(BaseProvider):
"""
Gemini 注册流程:
1. 注册 Google 账号(需手机号或直接用已有 Google 邮箱)
2. 访问 AI Studio → 创建 API Key
3. 提取 Key
注意: Google 注册强制手机验证,纯自动化难度极高。
推荐策略: 手动创建多个 Google 账号 → 本工具自动轮换多 Key。
"""
PROVIDER_NAME = "gemini"
def __init__(self, config: dict, email_service: EmailService, storage: AccountStorage):
super().__init__(config, email_service, storage)
self.provider_config = config.get("providers", {}).get("gemini", {})
def register(self) -> Optional[AccountResult]:
log.warning(
"[Gemini] Google 注册需要手机号验证,全自动注册受限。\n"
" 推荐: 手动注册 Google 账号 → 用 add_key 命令添加已有 Key。\n"
" 或: 使用 Hydra-gemini 项目聚合多 Key 做轮换池。"
)
return None
def add_existing_key(self, api_key: str, email: str = "manual@gemini") -> AccountResult:
"""手动添加已有的 Gemini API Key"""
result = AccountResult(
provider="gemini",
email=email,
api_key=api_key,
name="manual_add",
)
self.storage.save(result)
log.info(f"[Gemini] 手动添加 Key: {api_key[:16]}...")
return result

View File

@@ -0,0 +1,215 @@
"""
通用浏览器注册 Provider
适用于 Groq / DeepSeek / Mistral / Together / Cohere 等标准注册流程的平台。
流程: 打开注册页 → 填邮箱 → 填密码 → 验证邮箱 → 登录 → 提取 API Key
"""
import time
import random
import logging
from typing import Optional
from .base_provider import (
BaseProvider, AccountResult, EmailService, AccountStorage,
random_name, random_password,
)
log = logging.getLogger("auto_register")
PROVIDER_CONFIGS = {
"groq": {
"signup_url": "https://console.groq.com/login",
"api_key_url": "https://console.groq.com/keys",
"api_key_page_selector": "button:has-text('Create API Key')",
},
"deepseek": {
"signup_url": "https://platform.deepseek.com/sign_up",
"api_key_url": "https://platform.deepseek.com/api_keys",
"api_key_page_selector": "button:has-text('Create')",
},
"mistral": {
"signup_url": "https://console.mistral.ai/register",
"api_key_url": "https://console.mistral.ai/api-keys",
"api_key_page_selector": "button:has-text('Create')",
},
"together": {
"signup_url": "https://api.together.xyz/signup",
"api_key_url": "https://api.together.xyz/settings/api-keys",
"api_key_page_selector": "",
},
"cohere": {
"signup_url": "https://dashboard.cohere.com/welcome/register",
"api_key_url": "https://dashboard.cohere.com/api-keys",
"api_key_page_selector": "",
},
}
class GenericBrowserProvider(BaseProvider):
"""
通用浏览器自动注册。
大多数平台的注册流程相似,差异在 URL 和页面元素选择器上。
"""
PROVIDER_NAME = "generic"
def __init__(self, provider_name: str, config: dict, email_service: EmailService, storage: AccountStorage):
super().__init__(config, email_service, storage)
self.PROVIDER_NAME = provider_name
self.provider_config = config.get("providers", {}).get(provider_name, {})
self.browser_config = config.get("browser", {})
self.defaults = PROVIDER_CONFIGS.get(provider_name, {})
def _create_browser(self):
from DrissionPage import ChromiumPage, ChromiumOptions
co = ChromiumOptions()
if self.browser_config.get("headless", True):
co.set_argument("--headless=new")
co.set_argument("--disable-blink-features=AutomationControlled")
co.set_argument("--no-sandbox")
co.set_argument("--window-size=1920,1080")
if self.browser_config.get("chrome_path"):
co.set_browser_path(self.browser_config["chrome_path"])
proxy = self.config.get("proxy", {})
if proxy.get("enabled"):
co.set_proxy(f"{proxy['type']}://{proxy['host']}:{proxy['port']}")
return ChromiumPage(co)
def register(self) -> Optional[AccountResult]:
email, email_ctx = self.email_service.generate_email()
password = random_password()
first, last = random_name()
name = f"{first} {last}"
log.info(f"[{self.PROVIDER_NAME}] 开始注册: {email}")
browser = None
try:
browser = self._create_browser()
tab = browser.latest_tab
signup_url = self.provider_config.get("signup_url", self.defaults.get("signup_url", ""))
if not signup_url:
log.error(f"[{self.PROVIDER_NAME}] 未配置注册 URL")
return None
# Step 1: 打开注册页
tab.get(signup_url)
time.sleep(3)
log.info(f" [1/5] 注册页面已加载")
# Step 2: 尝试填写注册表单(通用选择器)
email_selectors = [
'@type=email', '@name=email', '@placeholder=Email',
'@autocomplete=email', 'input[type="email"]',
]
for sel in email_selectors:
el = tab.ele(sel, timeout=2)
if el:
el.clear()
el.input(email)
break
name_el = tab.ele('@name=name', timeout=2) or tab.ele('@placeholder=Name', timeout=1)
if name_el:
name_el.clear()
name_el.input(name)
pwd_selectors = ['@type=password', '@name=password']
for sel in pwd_selectors:
el = tab.ele(sel, timeout=2)
if el:
el.clear()
el.input(password)
break
submit = tab.ele("@type=submit", timeout=3)
if submit:
submit.click()
time.sleep(3)
log.info(f" [2/5] 表单已提交")
# Step 3: 等待验证码
code = self.email_service.wait_for_code(
email, email_ctx,
timeout=self.config.get("registration", {}).get("otp_timeout", 120),
)
if code:
otp_inputs = tab.eles("@data-index", timeout=5)
if otp_inputs:
for i, digit in enumerate(code):
if i < len(otp_inputs):
otp_inputs[i].input(digit)
time.sleep(0.1)
else:
code_input = (
tab.ele('@name=code', timeout=3)
or tab.ele('@name=otp', timeout=2)
or tab.ele('@placeholder=Code', timeout=2)
)
if code_input:
code_input.clear()
code_input.input(code)
submit = tab.ele("@type=submit", timeout=3)
if submit:
submit.click()
log.info(f" [3/5] 验证码已提交: {code}")
else:
log.warning(f" [3/5] 未收到验证码,尝试继续...")
time.sleep(3)
# Step 4: 导航到 API Key 页面
api_key_url = self.provider_config.get(
"api_key_url", self.defaults.get("api_key_url", "")
)
if api_key_url:
tab.get(api_key_url)
time.sleep(3)
# Step 5: 尝试提取 API Key
api_key = self._try_extract_key(tab)
if api_key:
log.info(f" [5/5] API Key 提取成功: {api_key[:16]}...")
return AccountResult(
provider=self.PROVIDER_NAME,
email=email,
password=password,
api_key=api_key,
name=name,
)
log.warning(f" [5/5] API Key 提取失败,请检查注册流程")
return AccountResult(
provider=self.PROVIDER_NAME,
email=email,
password=password,
api_key="",
name=name,
status="registered_no_key",
)
except Exception as e:
log.error(f"[{self.PROVIDER_NAME}] 注册异常: {e}")
return None
finally:
if browser:
try:
browser.quit()
except Exception:
pass
def _try_extract_key(self, tab) -> Optional[str]:
"""尝试从页面提取 API Key"""
import re
page_text = tab.html or ""
patterns = [
r'(sk-[a-zA-Z0-9]{32,})', # OpenAI 风格 sk-xxx
r'(gsk_[a-zA-Z0-9]{32,})', # Groq 风格 gsk_xxx
r'(xai-[a-zA-Z0-9]{32,})', # xAI 风格
r'(dsk-[a-zA-Z0-9]{32,})', # DeepSeek 风格
r'(AIzaSy[a-zA-Z0-9_-]{33})', # Google API Key
r'([a-zA-Z0-9]{32,64})', # 通用长 token
]
for pattern in patterns:
match = re.search(pattern, page_text)
if match:
return match.group(1)
return None

View File

@@ -0,0 +1,253 @@
"""
OpenAI (Codex) 自动注册 Provider
纯 API 模式 — 模拟 OAuth 2.0 + PKCE 注册流程,不需要浏览器。
参考: github.com/Ethan-W20/openai-auto-register
"""
import json
import time
import secrets
import urllib.parse
import base64
import logging
from typing import Optional
from curl_cffi import requests as cffi_requests
from .base_provider import (
BaseProvider, AccountResult, EmailService, AccountStorage,
random_name, random_password, random_birthday, create_pkce_pair,
)
log = logging.getLogger("auto_register")
OAI_AUTH_URL = "https://auth.openai.com/oauth/authorize"
OAI_SENTINEL_URL = "https://sentinel.openai.com/backend-api/sentinel/req"
OAI_SIGNUP_URL = "https://auth.openai.com/api/accounts/authorize/continue"
OAI_SEND_OTP_URL = "https://auth.openai.com/api/accounts/passwordless/send-otp"
OAI_VERIFY_OTP_URL = "https://auth.openai.com/api/accounts/email-otp/validate"
OAI_CREATE_URL = "https://auth.openai.com/api/accounts/create_account"
OAI_WORKSPACE_URL = "https://auth.openai.com/api/accounts/workspace/select"
OAI_TOKEN_URL = "https://auth.openai.com/oauth/token"
OAI_CLIENT_ID = "DRivsnm2Mu42T3KOpqdtwB3NYviHYzwD"
LOCAL_REDIRECT_URI = "http://localhost:1455/auth/callback"
BROWSER_IMPERSONATES = [
"chrome120", "chrome124", "chrome131",
"safari17_0", "safari18_0",
]
class OpenAIProvider(BaseProvider):
PROVIDER_NAME = "openai"
def __init__(self, config: dict, email_service: EmailService, storage: AccountStorage):
super().__init__(config, email_service, storage)
self.provider_config = config.get("providers", {}).get("openai", {})
def _create_session(self) -> cffi_requests.Session:
impersonate = secrets.choice(BROWSER_IMPERSONATES)
session = cffi_requests.Session(impersonate=impersonate)
session.headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
})
proxy = self.config.get("proxy", {})
if proxy.get("enabled"):
proxy_url = f"{proxy['type']}://"
if proxy.get("username"):
proxy_url += f"{proxy['username']}:{proxy['password']}@"
proxy_url += f"{proxy['host']}:{proxy['port']}"
session.proxies = {"http": proxy_url, "https": proxy_url}
return session
def register(self) -> Optional[AccountResult]:
email, email_ctx = self.email_service.generate_email()
log.info(f"[OpenAI] 开始注册: {email}")
session = self._create_session()
try:
# Step 1: OAuth 授权
verifier, challenge = create_pkce_pair()
state = secrets.token_urlsafe(16)
query = urllib.parse.urlencode({
"client_id": self.provider_config.get("client_id", OAI_CLIENT_ID),
"response_type": "code",
"redirect_uri": self.provider_config.get("redirect_uri", LOCAL_REDIRECT_URI),
"scope": "openid email profile offline_access",
"state": state,
"code_challenge": challenge,
"code_challenge_method": "S256",
"prompt": "login",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
})
auth_url = f"{OAI_AUTH_URL}?{query}"
resp = session.get(auth_url, allow_redirects=True)
log.info(f" [1/9] OAuth 授权: {resp.status_code}")
device_id = ""
for cookie in session.cookies:
if cookie.name == "oai-did":
device_id = cookie.value
break
# Step 2: Sentinel 反bot token
sentinel_body = {"p": "", "id": device_id, "flow": "authorize_continue"}
resp = session.post(
OAI_SENTINEL_URL, json=sentinel_body,
headers={
"Origin": "https://sentinel.openai.com",
"Referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html",
},
)
sentinel_token = resp.json()["token"]
sentinel_header = json.dumps({
"p": "", "t": "", "c": sentinel_token,
"id": device_id, "flow": "authorize_continue",
})
log.info(f" [2/9] Sentinel token 获取成功")
# Step 3: 提交邮箱
resp = session.post(
OAI_SIGNUP_URL,
json={"username": {"value": email, "kind": "email"}, "screen_hint": "signup"},
headers={
"Referer": "https://auth.openai.com/create-account",
"openai-sentinel-token": sentinel_header,
},
)
step3_data = resp.json()
page_type = step3_data.get("page", {}).get("type", "")
is_existing = page_type == "email_otp_verification"
log.info(f" [3/9] 提交邮箱: page_type={page_type}")
# Step 4: 发送 OTP
if not is_existing:
resp = session.post(
OAI_SEND_OTP_URL, json={},
headers={"Referer": "https://auth.openai.com/create-account/password"},
)
if resp.status_code != 200:
log.error(f" [4/9] 发送 OTP 失败: {resp.status_code}")
return None
log.info(f" [4/9] OTP 已发送")
# Step 5: 等待验证码
code = self.email_service.wait_for_code(
email, email_ctx,
timeout=self.config.get("registration", {}).get("otp_timeout", 120),
)
if not code:
log.error(f" [5/9] 验证码获取超时")
return None
log.info(f" [5/9] 验证码: {code}")
# Step 6: 验证 OTP
resp = session.post(
OAI_VERIFY_OTP_URL, json={"code": code},
headers={"Referer": "https://auth.openai.com/email-verification"},
)
if resp.status_code != 200:
log.error(f" [6/9] OTP 验证失败: {resp.status_code}")
return None
log.info(f" [6/9] OTP 验证通过")
# Step 7: 创建账号
first, last = random_name()
name = f"{first} {last}"
birthday = random_birthday()
if not is_existing:
resp = session.post(
OAI_CREATE_URL,
json={"name": name, "birthdate": birthday},
headers={"Referer": "https://auth.openai.com/about-you"},
)
log.info(f" [7/9] 创建账号: {name}")
else:
log.info(f" [7/9] 跳过(已存在)")
# Step 8: 选择 Workspace
auth_cookie = ""
for cookie in session.cookies:
if cookie.name == "oai-client-auth-session":
auth_cookie = cookie.value
break
workspace_id = None
if auth_cookie:
try:
b64 = auth_cookie.split(".")[0]
padding = "=" * ((4 - len(b64) % 4) % 4)
data = json.loads(base64.b64decode(b64 + padding))
workspaces = data.get("workspaces", [])
if workspaces:
workspace_id = workspaces[0]["id"]
except Exception:
pass
if workspace_id:
resp = session.post(
OAI_WORKSPACE_URL, json={"workspace_id": workspace_id},
headers={"Referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent"},
)
continue_url = resp.json().get("continue_url", "")
log.info(f" [8/9] Workspace 选择完成")
else:
log.warning(f" [8/9] 未找到 workspace_id尝试继续...")
continue_url = ""
# Step 9: 跟随重定向 → 获取 code → 兑换 Token
if continue_url:
resp = session.get(continue_url, allow_redirects=False)
redirect_chain = [resp]
while resp.status_code in (301, 302, 303, 307, 308) and len(redirect_chain) < 12:
loc = resp.headers.get("Location", "")
if loc.startswith("http://localhost"):
callback_url = loc
break
resp = session.get(loc, allow_redirects=False)
redirect_chain.append(resp)
else:
callback_url = resp.headers.get("Location", resp.url)
parsed = urllib.parse.urlparse(callback_url)
qs = urllib.parse.parse_qs(parsed.query)
auth_code = qs.get("code", [""])[0]
if auth_code:
resp = session.post(OAI_TOKEN_URL, data={
"grant_type": "authorization_code",
"client_id": self.provider_config.get("client_id", OAI_CLIENT_ID),
"code": auth_code,
"redirect_uri": self.provider_config.get("redirect_uri", LOCAL_REDIRECT_URI),
"code_verifier": verifier,
})
token_data = resp.json()
log.info(f" [9/9] Token 兑换成功!")
return AccountResult(
provider="openai",
email=email,
password="",
api_key="",
access_token=token_data.get("access_token", ""),
refresh_token=token_data.get("refresh_token", ""),
account_id="",
name=name,
extra={
"id_token": token_data.get("id_token", ""),
"workspace_id": workspace_id or "",
},
)
log.error(f" [9/9] Token 兑换失败:无法获取 authorization code")
return None
except Exception as e:
log.error(f"[OpenAI] 注册异常: {e}")
return None
finally:
session.close()

View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
全网API自动注册 — 主程序入口
支持: OpenAI / Cursor / Gemini / Groq / DeepSeek / Mistral / Together / Cohere
用法:
python auto_register.py --provider openai --count 5
python auto_register.py --provider cursor --count 3
python auto_register.py --list
python auto_register.py --list --provider openai
python auto_register.py --serve --port 8899
python auto_register.py --add-key gemini YOUR_API_KEY
"""
import os
import sys
import argparse
import logging
import yaml
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from providers.base_provider import EmailService, AccountStorage
from providers.openai_provider import OpenAIProvider
from providers.cursor_provider import CursorProvider
from providers.gemini_provider import GeminiProvider
from providers.generic_browser_provider import GenericBrowserProvider
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("auto_register")
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_CONFIG = SCRIPT_DIR / "config.yaml"
def load_config(path: str = None) -> dict:
config_path = Path(path) if path else DEFAULT_CONFIG
if not config_path.exists():
example = SCRIPT_DIR / "config.example.yaml"
if example.exists():
log.warning(f"未找到 config.yaml使用 config.example.yaml")
config_path = example
else:
log.error("未找到配置文件")
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def get_provider(name: str, config: dict, email_svc: EmailService, storage: AccountStorage):
providers = {
"openai": lambda: OpenAIProvider(config, email_svc, storage),
"cursor": lambda: CursorProvider(config, email_svc, storage),
"gemini": lambda: GeminiProvider(config, email_svc, storage),
"groq": lambda: GenericBrowserProvider("groq", config, email_svc, storage),
"deepseek": lambda: GenericBrowserProvider("deepseek", config, email_svc, storage),
"mistral": lambda: GenericBrowserProvider("mistral", config, email_svc, storage),
"together": lambda: GenericBrowserProvider("together", config, email_svc, storage),
"cohere": lambda: GenericBrowserProvider("cohere", config, email_svc, storage),
}
factory = providers.get(name)
if not factory:
log.error(f"不支持的平台: {name}。支持: {', '.join(providers.keys())}")
sys.exit(1)
return factory()
def cmd_register(args, config):
storage_cfg = config.get("storage", {})
storage = AccountStorage(
db_path=storage_cfg.get("db_path", "accounts.db"),
json_dir=storage_cfg.get("json_dir", "tokens/"),
)
email_svc = EmailService(config.get("email", {}))
provider = get_provider(args.provider, config, email_svc, storage)
reg_cfg = config.get("registration", {})
max_workers = reg_cfg.get("max_workers", 5)
log.info(f"开始批量注册: 平台={args.provider}, 数量={args.count}, 并发={max_workers}")
results = provider.register_batch(args.count, max_workers=max_workers)
print(f"\n{'='*60}")
print(f"注册完成: {len(results)}/{args.count} 成功")
for r in results:
key_display = r.api_key[:20] if r.api_key else r.access_token[:20] if r.access_token else "N/A"
print(f" [{r.provider}] {r.email}{key_display}...")
print(f"{'='*60}")
def cmd_list(args, config):
storage_cfg = config.get("storage", {})
storage = AccountStorage(
db_path=storage_cfg.get("db_path", "accounts.db"),
json_dir=storage_cfg.get("json_dir", "tokens/"),
)
accounts = storage.list_accounts(args.provider if args.provider != "all" else None)
if not accounts:
print("暂无注册账号")
return
print(f"\n{'='*80}")
print(f"{'平台':<12} {'邮箱':<30} {'Key/Token':<25} {'状态':<10} {'时间'}")
print(f"{'-'*80}")
for a in accounts:
key = a.get("api_key") or a.get("access_token") or ""
key_short = key[:20] + "..." if key else "N/A"
print(f"{a['provider']:<12} {a['email']:<30} {key_short:<25} {a['status']:<10} {a['registered_at'][:16]}")
print(f"{'='*80}")
print(f"{len(accounts)} 个账号")
def cmd_add_key(args, config):
storage_cfg = config.get("storage", {})
storage = AccountStorage(
db_path=storage_cfg.get("db_path", "accounts.db"),
json_dir=storage_cfg.get("json_dir", "tokens/"),
)
email_svc = EmailService(config.get("email", {}))
if args.provider == "gemini":
p = GeminiProvider(config, email_svc, storage)
p.add_existing_key(args.key, email=f"manual_{args.provider}")
else:
from providers.base_provider import AccountResult
result = AccountResult(
provider=args.provider,
email=f"manual_{args.provider}",
api_key=args.key,
name="manual_add",
)
storage.save(result)
print(f"已添加 {args.provider} Key: {args.key[:20]}...")
def cmd_serve(args, config):
"""启动 Key 管理 API 服务"""
from key_manager_api import create_app
import uvicorn
api_cfg = config.get("api_server", {})
host = api_cfg.get("host", "0.0.0.0")
port = args.port or api_cfg.get("port", 8899)
app = create_app(config)
log.info(f"启动 Key 管理 API: http://{host}:{port}")
uvicorn.run(app, host=host, port=port)
def main():
parser = argparse.ArgumentParser(description="全网API自动注册工具")
parser.add_argument("--config", "-c", default=None, help="配置文件路径")
sub = parser.add_subparsers(dest="command")
reg_parser = sub.add_parser("register", aliases=["reg"], help="注册新账号")
reg_parser.add_argument("--provider", "-p", required=True, help="平台名")
reg_parser.add_argument("--count", "-n", type=int, default=1, help="注册数量")
list_parser = sub.add_parser("list", aliases=["ls"], help="列出已注册账号")
list_parser.add_argument("--provider", "-p", default="all", help="筛选平台")
add_parser = sub.add_parser("add-key", help="手动添加已有 Key")
add_parser.add_argument("--provider", "-p", required=True, help="平台名")
add_parser.add_argument("--key", "-k", required=True, help="API Key")
serve_parser = sub.add_parser("serve", help="启动 Key 管理 API 服务")
serve_parser.add_argument("--port", type=int, default=None, help="端口号")
# 兼容旧式参数
parser.add_argument("--provider", "-p", default=None, help="平台名")
parser.add_argument("--count", "-n", type=int, default=1, help="注册数量")
parser.add_argument("--list", action="store_true", help="列出账号")
parser.add_argument("--serve", action="store_true", help="启动 API 服务")
parser.add_argument("--port", type=int, default=None, help="端口号")
parser.add_argument("--add-key", nargs=2, metavar=("PROVIDER", "KEY"), help="手动添加 Key")
args = parser.parse_args()
config = load_config(args.config)
if args.command in ("register", "reg"):
cmd_register(args, config)
elif args.command in ("list", "ls"):
cmd_list(args, config)
elif args.command == "add-key":
cmd_add_key(args, config)
elif args.command == "serve":
cmd_serve(args, config)
elif args.list:
class FakeArgs:
provider = args.provider or "all"
cmd_list(FakeArgs(), config)
elif args.serve:
cmd_serve(args, config)
elif args.add_key:
class FakeArgs:
provider = args.add_key[0]
key = args.add_key[1]
cmd_add_key(FakeArgs(), config)
elif args.provider:
class FakeArgs:
provider = args.provider
count = args.count
cmd_register(FakeArgs(), config)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,90 @@
# 全网API自动注册 配置文件
# 复制为 config.yaml 并填写实际值
email:
# 邮箱类型: tempmail / cloudflare_worker / domain_imap
type: "tempmail"
# --- tempmail.plus 配置 ---
tempmail:
username: "your_prefix" # tempmail.plus 前缀
pin: "" # PIN 码(可选)
# --- Cloudflare Worker 临时邮箱(推荐自建) ---
cloudflare_worker:
url: "https://your-worker.workers.dev"
domain: "your-domain.com"
admin_password: ""
# --- 自有域名 IMAP最稳定 ---
domain_imap:
domains:
- "your-domain.com"
imap_host: "mail.your-domain.com"
imap_port: 993
imap_user: "catch-all@your-domain.com"
imap_pass: "your-password"
use_ssl: true
proxy:
enabled: false
type: "http" # http / socks5
host: ""
port: ""
username: ""
password: ""
browser:
headless: true
user_agent: "" # 留空则随机生成
chrome_path: "" # 留空则自动检测
registration:
max_workers: 5 # 并发注册数
retry_max: 3 # 单次注册最大重试
interval_min: 3 # 批量间隔最小秒数
interval_max: 8 # 批量间隔最大秒数
otp_timeout: 120 # 等待验证码超时秒数
storage:
db_path: "accounts.db" # SQLite 数据库路径
json_dir: "tokens/" # JSON Token 存储目录
api_server:
host: "0.0.0.0"
port: 8899
# 各平台特定配置
providers:
openai:
enabled: true
client_id: "DRivsnm2Mu42T3KOpqdtwB3NYviHYzwD"
redirect_uri: "http://localhost:1455/auth/callback"
cursor:
enabled: true
signup_url: "https://authenticator.cursor.sh/sign-up"
settings_url: "https://www.cursor.com/settings"
gemini:
enabled: true
groq:
enabled: true
signup_url: "https://console.groq.com/login"
deepseek:
enabled: true
signup_url: "https://platform.deepseek.com/sign_up"
mistral:
enabled: true
signup_url: "https://console.mistral.ai/register"
together:
enabled: true
signup_url: "https://api.together.xyz/signup"
cohere:
enabled: true
signup_url: "https://dashboard.cohere.com/welcome/register"

View File

@@ -0,0 +1,113 @@
"""
Key 管理 API 服务
提供 RESTful 接口管理已注册的 API Key 池,支持轮换获取 Key。
"""
import sys
from pathlib import Path
from datetime import datetime
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from providers.base_provider import AccountStorage, AccountResult
class AddKeyRequest(BaseModel):
provider: str
email: str = ""
api_key: str = ""
access_token: str = ""
password: str = ""
def create_app(config: dict) -> FastAPI:
storage_cfg = config.get("storage", {})
storage = AccountStorage(
db_path=storage_cfg.get("db_path", "accounts.db"),
json_dir=storage_cfg.get("json_dir", "tokens/"),
)
app = FastAPI(title="全网API Key管理", version="1.0")
@app.get("/health")
def health():
return {"status": "ok", "time": datetime.now().isoformat()}
@app.get("/stats")
def stats():
"""各平台账号统计"""
all_accounts = storage.list_accounts()
by_provider = {}
for a in all_accounts:
p = a["provider"]
if p not in by_provider:
by_provider[p] = {"total": 0, "active": 0, "with_key": 0}
by_provider[p]["total"] += 1
if a["status"] == "active":
by_provider[p]["active"] += 1
if a.get("api_key"):
by_provider[p]["with_key"] += 1
return {"total": len(all_accounts), "by_provider": by_provider}
@app.get("/accounts")
def list_accounts(provider: str = Query(None)):
"""列出账号(可按平台筛选)"""
accounts = storage.list_accounts(provider)
for a in accounts:
if a.get("api_key"):
a["api_key_preview"] = a["api_key"][:16] + "..."
if a.get("access_token"):
a["access_token_preview"] = a["access_token"][:16] + "..."
return {"count": len(accounts), "accounts": accounts}
@app.get("/key/random")
def random_key(provider: str = Query(...)):
"""随机获取一个可用的 API Key用于轮换池"""
key = storage.get_random_key(provider)
if not key:
raise HTTPException(status_code=404, detail=f"No active key for {provider}")
return {"provider": provider, "api_key": key}
@app.get("/key/next")
def next_key(provider: str = Query(...)):
"""轮换获取下一个 KeyRound-Robin"""
accounts = storage.list_accounts(provider)
active = [a for a in accounts if a["status"] == "active" and a.get("api_key")]
if not active:
raise HTTPException(status_code=404, detail=f"No active key for {provider}")
import time
idx = int(time.time()) % len(active)
return {"provider": provider, "api_key": active[idx]["api_key"], "email": active[idx]["email"]}
@app.post("/key/add")
def add_key(req: AddKeyRequest):
"""手动添加 Key"""
result = AccountResult(
provider=req.provider,
email=req.email or f"manual_{req.provider}",
api_key=req.api_key,
access_token=req.access_token,
password=req.password,
)
storage.save(result)
return {"status": "ok", "message": f"Added key for {req.provider}"}
@app.get("/openai-compatible/v1/models")
def openai_models():
"""OpenAI 兼容接口 — 模型列表"""
return {
"object": "list",
"data": [
{"id": "gpt-4", "object": "model"},
{"id": "gpt-4o", "object": "model"},
{"id": "gpt-3.5-turbo", "object": "model"},
{"id": "gemini-pro", "object": "model"},
{"id": "llama-3.1-70b", "object": "model"},
]
}
return app

View File

@@ -0,0 +1,8 @@
curl_cffi>=0.7.0
DrissionPage>=4.0.0
aiosqlite>=0.20.0
fastapi>=0.115.0
uvicorn>=0.32.0
pyyaml>=6.0
httpx>=0.27.0
faker>=30.0.0

View File

@@ -1,7 +1,7 @@
# 卡若AI 技能注册表Skill Registry
> **一张表查所有技能**。任何 AI 拿到这张表,就能按关键词找到对应技能的 SKILL.md 路径并执行。
> 71 技能 | 14 成员 | 5 负责人
> 72 技能 | 14 成员 | 5 负责人
> 版本5.5 | 更新2026-03-13
>
> **技能配置、安装、删除、掌管人登记** → 见 **`运营中枢/工作台/01_技能控制台.md`**。
@@ -110,6 +110,7 @@
| M01g | 快手发布 | 木叶 | **快手发布、发布到快手、快手登录、快手上传、kuaishou发布** | `03_卡木/木叶_视频内容/快手发布/SKILL.md` | 逆向 cp.kuaishou.com API 视频发布 |
| M01h | 多平台分发 | 木叶 | **多平台分发、一键分发、全平台发布、批量分发、视频分发** | `03_卡木/木叶_视频内容/多平台分发/SKILL.md` | 一键分发到5平台抖音/B站/视频号/小红书/快手Cookie统一管理 |
| M02 | 网站逆向分析 | 木根 | 逆向分析、模拟登录 | `03_卡木/木根_逆向分析/网站逆向分析/SKILL.md` | 网站 API 分析、SDK 生成 |
| M02a | **全网API自动注册** | 木根 | **API注册、自动注册、批量注册、API Key、注册账号、免费API、API池、key池、自动开号** | `03_卡木/木根_逆向分析/全网API自动注册/SKILL.md` | OpenAI/Cursor/Gemini/Groq 等全网 API 自动注册+Key 池管理 |
| M03 | 项目生成 | 木果 | 生成项目、五行模板 | `03_卡木/木果_项目模板/项目生成/SKILL.md` | 按五行模板生成新项目 |
| M04 | 开发模板 | 木果 | 创建项目、初始化模板 | `03_卡木/木果_项目模板/开发模板/SKILL.md` | 前后端项目模板库 |
| M05 | 个人档案生成器 | 木果 | 个人档案、档案生成 | `03_卡木/木果_项目模板/个人档案生成器/SKILL.md` | 自动生成个人介绍档案 |
@@ -175,7 +176,7 @@
|:--|:---|:--|:--|
| 金 | 卡资 | 2 | 21 |
| 水 | 卡人 | 3 | 13 |
| 木 | 卡木 | 3 | 13 |
| 木 | 卡木 | 3 | 14 |
| 火 | 卡火 | 4 | 16 |
| 土 | 卡土 | 4 | 8 |
| **合计** | **5** | **14** | **71** |
| **合计** | **5** | **14** | **72** |

View File

@@ -359,3 +359,4 @@
| 2026-03-15 07:18:57 | 🔄 卡若AI 同步 2026-03-15 07:18 | 更新:火炬、火种知识模型、运营中枢参考资料、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-15 12:31:12 | 🔄 卡若AI 同步 2026-03-15 12:31 | 更新:金仓、运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-15 15:48:32 | 🔄 卡若AI 同步 2026-03-15 15:48 | 更新:运营中枢工作台 | 排除 >20MB: 11 个 |
| 2026-03-15 16:00:52 | 🔄 卡若AI 同步 2026-03-15 16:00 | 更新:运营中枢工作台 | 排除 >20MB: 11 个 |

View File

@@ -362,3 +362,4 @@
| 2026-03-15 07:18:57 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-15 07:18 | 更新:火炬、火种知识模型、运营中枢参考资料、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-15 12:31:12 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-15 12:31 | 更新:金仓、运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-15 15:48:32 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-15 15:48 | 更新:运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |
| 2026-03-15 16:00:52 | 成功 | 成功 | 🔄 卡若AI 同步 2026-03-15 16:00 | 更新:运营中枢工作台 | 排除 >20MB: 11 个 | [仓库](http://open.quwanzhi.com:3000/fnvtk/karuo-ai) [百科](http://open.quwanzhi.com:3000/fnvtk/karuo-ai/wiki) |