Files
soul/addons/Universal_Payment_Module copy/1_核心设计_通用协议/安全与合规.md

384 lines
8.1 KiB
Markdown
Raw Normal View History

# 支付安全与合规指南 (Security & Compliance) v4.0
> 支付系统安全最佳实践,保护你的资金和用户数据
## 🔐 密钥安全
### 1. 密钥存储原则
```
❌ 错误做法:
- 将密钥硬编码在代码中
- 将密钥提交到 Git 仓库
- 通过即时通讯工具传输密钥
- 使用弱密码作为 API Key
✅ 正确做法:
- 使用环境变量存储密钥
- 使用专业密钥管理服务 (AWS KMS, HashiCorp Vault)
- 定期轮换密钥
- 最小权限原则
```
### 2. .gitignore 必须包含
```gitignore
# 支付密钥相关
.env
.env.local
.env.*.local
*.pem
*.key
cert/
config/payment.yml
secrets/
```
### 3. 密钥轮换
- 定期更换 API 密钥 (建议每 90 天)
- 发现泄露立即作废并重新生成
- 保留旧密钥短暂过渡期
---
## 🔒 通信安全
### 1. HTTPS 强制
```nginx
# Nginx 配置示例
server {
listen 80;
server_name your-domain.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com;
ssl_certificate /path/to/fullchain.pem;
ssl_certificate_key /path/to/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
}
```
### 2. 证书管理
- 使用受信任的 CA 签发证书
- 定期检查证书有效期
- 推荐 Let's Encrypt 自动续期
---
## ✅ 签名验证
### 1. 支付宝签名验证
```python
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
import base64
def verify_alipay_sign(params: dict, sign: str, public_key: str) -> bool:
"""验证支付宝签名"""
# 1. 参数排序
sorted_params = sorted([(k, v) for k, v in params.items() if k != 'sign' and v])
# 2. 拼接待签名字符串
sign_str = '&'.join([f'{k}={v}' for k, v in sorted_params])
# 3. RSA2 验签
key = RSA.import_key(f"-----BEGIN PUBLIC KEY-----\n{public_key}\n-----END PUBLIC KEY-----")
verifier = PKCS1_v1_5.new(key)
hash_obj = SHA256.new(sign_str.encode('utf-8'))
try:
verifier.verify(hash_obj, base64.b64decode(sign))
return True
except (ValueError, TypeError):
return False
```
### 2. 微信签名验证
```python
import hashlib
def verify_wechat_sign(params: dict, sign: str, api_key: str) -> bool:
"""验证微信支付签名"""
# 1. 参数排序
sorted_params = sorted([(k, v) for k, v in params.items() if k != 'sign' and v])
# 2. 拼接待签名字符串
sign_str = '&'.join([f'{k}={v}' for k, v in sorted_params])
sign_str += f'&key={api_key}'
# 3. MD5 签名
calculated_sign = hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
return calculated_sign == sign
```
---
## 💰 金额校验
### 1. 回调金额必须验证
```python
def handle_payment_notify(trade_sn: str, paid_amount: int):
"""处理支付回调时必须验证金额"""
trade = get_trade_by_sn(trade_sn)
# 金额必须严格匹配
if paid_amount != trade.cash_amount:
log.error(f"金额不匹配! 订单:{trade.cash_amount}, 回调:{paid_amount}")
raise AmountMismatchError()
# 继续处理...
```
### 2. 防止金额篡改
```python
# 前端传入的金额仅用于展示,实际金额从后端订单读取
def checkout(order_sn: str, gateway: str):
order = get_order(order_sn)
# 金额从数据库读取,不信任前端
amount = order.pay_amount
return create_trade(order_sn, amount, gateway)
```
---
## 🛡️ 回调安全
### 1. IP 白名单
```python
# 支付平台回调 IP 白名单
PAYMENT_IP_WHITELIST = {
'alipay': [
'110.75.0.0/16',
'203.209.0.0/16'
],
'wechat': [
'101.226.0.0/16',
'140.207.0.0/16'
]
}
def verify_callback_ip(gateway: str, client_ip: str) -> bool:
"""验证回调来源 IP"""
import ipaddress
whitelist = PAYMENT_IP_WHITELIST.get(gateway, [])
client = ipaddress.ip_address(client_ip)
for cidr in whitelist:
if client in ipaddress.ip_network(cidr):
return True
return False
```
### 2. 防重放攻击
```python
import time
def check_notify_timestamp(timestamp: int) -> bool:
"""检查回调时间戳,防止重放攻击"""
now = int(time.time())
# 允许 5 分钟的时间差
if abs(now - timestamp) > 300:
log.warning(f"回调时间戳异常: {timestamp}")
return False
return True
```
### 3. 幂等性处理
```python
def process_notify_idempotent(trade_sn: str, notify_data: dict):
"""幂等性处理回调"""
# 使用分布式锁
lock_key = f"payment_notify:{trade_sn}"
with redis_lock(lock_key, timeout=10):
trade = get_trade_by_sn(trade_sn)
# 已处理过,直接返回成功
if trade.status == 'paid':
return success_response()
# 处理支付成功逻辑
update_trade_to_paid(trade, notify_data)
return success_response()
```
---
## 📝 日志审计
### 1. 必须记录的日志
```python
import logging
payment_logger = logging.getLogger('payment')
# 创建交易日志
payment_logger.info(f"创建交易 | trade_sn={trade_sn} | order_sn={order_sn} | amount={amount} | gateway={gateway}")
# 回调日志
payment_logger.info(f"收到回调 | gateway={gateway} | trade_sn={trade_sn} | raw_data={raw_data[:500]}")
# 签名验证日志
payment_logger.info(f"签名验证 | trade_sn={trade_sn} | result={verify_result}")
# 状态变更日志
payment_logger.info(f"状态变更 | trade_sn={trade_sn} | from={old_status} | to={new_status}")
# 退款日志
payment_logger.info(f"发起退款 | refund_sn={refund_sn} | trade_sn={trade_sn} | amount={amount}")
```
### 2. 敏感信息脱敏
```python
def mask_sensitive(data: dict) -> dict:
"""敏感信息脱敏"""
sensitive_keys = ['card_no', 'id_card', 'phone', 'bank_account']
masked = data.copy()
for key in sensitive_keys:
if key in masked:
value = str(masked[key])
if len(value) > 4:
masked[key] = value[:2] + '*' * (len(value) - 4) + value[-2:]
return masked
```
---
## 🚨 异常处理
### 1. 支付异常分类
```python
class PaymentError(Exception):
"""支付基础异常"""
pass
class SignatureError(PaymentError):
"""签名验证失败"""
pass
class AmountMismatchError(PaymentError):
"""金额不匹配"""
pass
class OrderExpiredError(PaymentError):
"""订单已过期"""
pass
class DuplicatePaymentError(PaymentError):
"""重复支付"""
pass
class RefundError(PaymentError):
"""退款失败"""
pass
```
### 2. 统一异常处理
```python
@app.exception_handler(PaymentError)
async def payment_exception_handler(request, exc):
return JSONResponse(
status_code=400,
content={
"code": 400,
"message": str(exc),
"data": None
}
)
```
---
## ✅ 合规要求
### 1. PCI DSS 合规 (信用卡)
- 不存储完整卡号、CVV、PIN
- 使用 Stripe/PayPal 等符合 PCI DSS 的支付网关
- 定期安全评估
### 2. GDPR 合规 (欧盟用户)
- 明确告知用户数据用途
- 提供数据删除功能
- 用户同意授权
### 3. 中国支付合规
- 接入持牌支付机构
- 实名认证
- 交易限额管理
---
## 📋 安全检查清单
```markdown
## 上线前安全检查
### 密钥管理
- [ ] 所有密钥通过环境变量配置
- [ ] 密钥未提交到代码仓库
- [ ] 生产环境密钥与测试环境隔离
### 通信安全
- [ ] 启用 HTTPS
- [ ] 证书有效且受信任
- [ ] 启用 HSTS
### 签名验证
- [ ] 所有回调验签
- [ ] 验签失败拒绝处理
### 金额校验
- [ ] 回调金额与订单金额比对
- [ ] 金额从后端读取
### 日志审计
- [ ] 关键操作有日志
- [ ] 敏感信息脱敏
### 异常处理
- [ ] 异常不泄露敏感信息
- [ ] 有统一异常处理
### 回调安全
- [ ] IP 白名单验证 (可选)
- [ ] 幂等性处理
- [ ] 防重放攻击
```