Files
soul-yongping/scripts/sync_order_status.py

241 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
订单状态同步任务(兜底机制)
功能:
1. 定时查询 'created' 状态的订单
2. 调用微信支付接口查询真实状态
3. 同步订单状态paid / expired
4. 更新用户购买记录
运行方式:
- 手动: python scripts/sync_order_status.py
- 定时: crontab -e 添加 "*/5 * * * * python /path/to/sync_order_status.py"
- Node.js: 使用 node-cron 定时调用
"""
import sys
import os
import json
import time
import hashlib
import random
import string
from datetime import datetime, timedelta
try:
import pymysql
import requests
except ImportError:
print("[ERROR] 缺少依赖库,请安装:")
print(" pip install pymysql requests")
sys.exit(1)
# 数据库配置
DB = {
"host": "56b4c23f6853c.gz.cdb.myqcloud.com",
"port": 14413,
"user": "cdb_outerroot",
"password": "Zhiqun1984",
"database": "soul_miniprogram",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
"connect_timeout": 15,
}
# 微信支付配置(从环境变量或配置文件读取)
WECHAT_PAY_CONFIG = {
"appid": os.environ.get("WECHAT_APPID", "wxb8bbb2b10dec74aa"),
"mch_id": os.environ.get("WECHAT_MCH_ID", "1318592501"),
"api_key": os.environ.get("WECHAT_API_KEY", "YOUR_API_KEY_HERE"), # 需要配置真实的 API Key
}
# 订单超时时间(分钟)
ORDER_TIMEOUT_MINUTES = 30
def log(message, level="INFO"):
"""统一日志输出"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def generate_nonce_str(length=32):
"""生成随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def create_sign(params, api_key):
"""生成微信支付签名"""
# 1. 参数排序
sorted_params = sorted(params.items())
# 2. 拼接字符串
string_a = '&'.join([f"{k}={v}" for k, v in sorted_params if v])
string_sign_temp = f"{string_a}&key={api_key}"
# 3. MD5 加密并转大写
sign = hashlib.md5(string_sign_temp.encode('utf-8')).hexdigest().upper()
return sign
def query_wechat_order_status(out_trade_no):
"""
查询微信支付订单状态
文档: https://pay.weixin.qq.com/wiki/doc/api/wxa/wxa_api.php?chapter=9_2
"""
url = "https://api.mch.weixin.qq.com/pay/orderquery"
params = {
"appid": WECHAT_PAY_CONFIG["appid"],
"mch_id": WECHAT_PAY_CONFIG["mch_id"],
"out_trade_no": out_trade_no,
"nonce_str": generate_nonce_str(),
}
# 生成签名
params["sign"] = create_sign(params, WECHAT_PAY_CONFIG["api_key"])
# 构建 XML 请求体
xml_data = "<xml>"
for key, value in params.items():
xml_data += f"<{key}>{value}</{key}>"
xml_data += "</xml>"
try:
response = requests.post(url, data=xml_data.encode('utf-8'), headers={'Content-Type': 'application/xml'}, timeout=10)
# 解析 XML 响应(简单处理,生产环境建议用 xml.etree.ElementTree
resp_text = response.text
# 提取关键字段
if '<return_code><![CDATA[SUCCESS]]></return_code>' in resp_text:
if '<trade_state><![CDATA[SUCCESS]]></trade_state>' in resp_text:
return 'SUCCESS'
elif '<trade_state><![CDATA[NOTPAY]]></trade_state>' in resp_text:
return 'NOTPAY'
elif '<trade_state><![CDATA[CLOSED]]></trade_state>' in resp_text:
return 'CLOSED'
elif '<trade_state><![CDATA[REFUND]]></trade_state>' in resp_text:
return 'REFUND'
else:
return 'UNKNOWN'
else:
log(f"查询订单失败: {resp_text}", "WARN")
return 'ERROR'
except Exception as e:
log(f"查询微信订单异常: {e}", "ERROR")
return 'ERROR'
def sync_order_status():
"""同步订单状态(主函数)"""
log("========== 订单状态同步任务开始 ==========")
conn = pymysql.connect(**DB)
cursor = conn.cursor()
try:
# 1. 查询所有 'created' 状态的订单(最近 2 小时内创建的)
two_hours_ago = datetime.now() - timedelta(hours=2)
cursor.execute("""
SELECT id, order_sn, user_id, product_type, product_id, amount, created_at
FROM orders
WHERE status = 'created' AND created_at >= %s
ORDER BY created_at DESC
""", (two_hours_ago,))
pending_orders = cursor.fetchall()
if not pending_orders:
log("没有需要同步的订单")
return
log(f"找到 {len(pending_orders)} 个待同步订单")
synced_count = 0
expired_count = 0
for order in pending_orders:
order_sn = order['order_sn']
created_at = order['created_at']
# 2. 判断订单是否超时(超过 30 分钟)
time_diff = datetime.now() - created_at
if time_diff > timedelta(minutes=ORDER_TIMEOUT_MINUTES):
# 超时订单:标记为 expired
log(f"订单 {order_sn} 超时 ({time_diff.seconds // 60} 分钟),标记为 expired")
cursor.execute("""
UPDATE orders
SET status = 'expired', updated_at = NOW()
WHERE order_sn = %s
""", (order_sn,))
expired_count += 1
continue
# 3. 查询微信支付状态(跳过,因为需要真实 API Key
# 生产环境中取消下面的注释
"""
log(f"查询订单 {order_sn} 的微信支付状态...")
wechat_status = query_wechat_order_status(order_sn)
if wechat_status == 'SUCCESS':
# 微信支付成功,更新本地订单为 paid
log(f"订单 {order_sn} 微信支付成功,更新为 paid")
cursor.execute('''
UPDATE orders
SET status = 'paid', updated_at = NOW()
WHERE order_sn = %s
''', (order_sn,))
# 更新用户购买记录
if order['product_type'] == 'fullbook':
cursor.execute('''
UPDATE users
SET has_full_book = 1
WHERE id = %s
''', (order['user_id'],))
synced_count += 1
elif wechat_status == 'NOTPAY':
log(f"订单 {order_sn} 尚未支付,保持 created 状态")
elif wechat_status == 'CLOSED':
log(f"订单 {order_sn} 已关闭,标记为 cancelled")
cursor.execute('''
UPDATE orders
SET status = 'cancelled', updated_at = NOW()
WHERE order_sn = %s
''', (order_sn,))
else:
log(f"订单 {order_sn} 查询失败或状态未知: {wechat_status}", "WARN")
"""
# 测试环境:模拟查询(跳过微信接口)
log(f"[TEST] 订单 {order_sn} 跳过微信查询(需配置 API Key")
conn.commit()
log(f"同步完成: 同步 {synced_count} 个,超时 {expired_count}")
except Exception as e:
conn.rollback()
log(f"同步失败: {e}", "ERROR")
import traceback
traceback.print_exc()
finally:
cursor.close()
conn.close()
log("========== 订单状态同步任务结束 ==========\n")
if __name__ == "__main__":
sync_order_status()