Files
soul/lib/modules/distribution/websocket.ts

315 lines
8.1 KiB
TypeScript
Raw Normal View History

/**
* WebSocket实时推送服务
*
*/
// 消息类型定义
export type WebSocketMessageType =
| 'binding_expiring' // 绑定即将过期
| 'binding_expired' // 绑定已过期
| 'binding_converted' // 绑定已转化(用户付款)
| 'withdrawal_approved' // 提现已通过
| 'withdrawal_completed' // 提现已完成
| 'withdrawal_rejected' // 提现已拒绝
| 'earnings_added' // 收益增加
| 'system_notice'; // 系统通知
// 消息结构
export interface WebSocketMessage {
type: WebSocketMessageType;
userId: string; // 目标用户ID
data: Record<string, unknown>;
timestamp: string;
messageId: string;
}
// 绑定过期提醒数据
export interface BindingExpiringData {
bindingId: string;
visitorNickname?: string;
visitorPhone?: string;
daysRemaining: number;
expireTime: string;
}
// 提现状态更新数据
export interface WithdrawalUpdateData {
withdrawalId: string;
amount: number;
status: string;
paymentNo?: string;
rejectReason?: string;
}
// 收益增加数据
export interface EarningsAddedData {
orderId: string;
orderAmount: number;
commission: number;
visitorNickname?: string;
}
/**
* WebSocket消息队列
* 使Redis或其他消息队列
*/
const messageQueue: Map<string, WebSocketMessage[]> = new Map();
/**
* ID
*/
function generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
*
*/
export function pushMessage(message: Omit<WebSocketMessage, 'messageId' | 'timestamp'>): void {
const fullMessage: WebSocketMessage = {
...message,
messageId: generateMessageId(),
timestamp: new Date().toISOString(),
};
const userMessages = messageQueue.get(message.userId) || [];
userMessages.push(fullMessage);
// 保留最近100条消息
if (userMessages.length > 100) {
userMessages.shift();
}
messageQueue.set(message.userId, userMessages);
console.log('[WebSocket] 消息已入队:', {
type: fullMessage.type,
userId: fullMessage.userId,
messageId: fullMessage.messageId,
});
}
/**
*
*/
export function getMessages(userId: string, since?: string): WebSocketMessage[] {
const userMessages = messageQueue.get(userId) || [];
if (since) {
return userMessages.filter(m => m.timestamp > since);
}
return userMessages;
}
/**
*
*/
export function clearMessages(userId: string, messageIds: string[]): void {
const userMessages = messageQueue.get(userId) || [];
const filtered = userMessages.filter(m => !messageIds.includes(m.messageId));
messageQueue.set(userId, filtered);
}
/**
*
*/
export function pushBindingExpiringReminder(params: {
userId: string;
bindingId: string;
visitorNickname?: string;
visitorPhone?: string;
daysRemaining: number;
expireTime: string;
}): void {
pushMessage({
type: 'binding_expiring',
userId: params.userId,
data: {
bindingId: params.bindingId,
visitorNickname: params.visitorNickname,
visitorPhone: params.visitorPhone,
daysRemaining: params.daysRemaining,
expireTime: params.expireTime,
message: `用户 ${params.visitorNickname || params.visitorPhone || '未知'} 的绑定将在 ${params.daysRemaining} 天后过期`,
},
});
}
/**
*
*/
export function pushBindingExpiredNotice(params: {
userId: string;
bindingId: string;
visitorNickname?: string;
visitorPhone?: string;
}): void {
pushMessage({
type: 'binding_expired',
userId: params.userId,
data: {
bindingId: params.bindingId,
visitorNickname: params.visitorNickname,
visitorPhone: params.visitorPhone,
message: `用户 ${params.visitorNickname || params.visitorPhone || '未知'} 的绑定已过期`,
},
});
}
/**
*
*/
export function pushBindingConvertedNotice(params: {
userId: string;
bindingId: string;
orderId: string;
orderAmount: number;
commission: number;
visitorNickname?: string;
}): void {
pushMessage({
type: 'binding_converted',
userId: params.userId,
data: {
bindingId: params.bindingId,
orderId: params.orderId,
orderAmount: params.orderAmount,
commission: params.commission,
visitorNickname: params.visitorNickname,
message: `恭喜!用户 ${params.visitorNickname || '未知'} 已付款 ¥${params.orderAmount},您获得佣金 ¥${params.commission.toFixed(2)}`,
},
});
}
/**
*
*/
export function pushWithdrawalUpdate(params: {
userId: string;
withdrawalId: string;
amount: number;
status: 'approved' | 'completed' | 'rejected';
paymentNo?: string;
rejectReason?: string;
}): void {
const type: WebSocketMessageType =
params.status === 'approved' ? 'withdrawal_approved' :
params.status === 'completed' ? 'withdrawal_completed' : 'withdrawal_rejected';
const messages: Record<string, string> = {
approved: `您的提现申请 ¥${params.amount.toFixed(2)} 已通过审核,正在打款中...`,
completed: `您的提现 ¥${params.amount.toFixed(2)} 已成功到账,流水号: ${params.paymentNo}`,
rejected: `您的提现申请 ¥${params.amount.toFixed(2)} 已被拒绝,原因: ${params.rejectReason || '未说明'}`,
};
pushMessage({
type,
userId: params.userId,
data: {
withdrawalId: params.withdrawalId,
amount: params.amount,
status: params.status,
paymentNo: params.paymentNo,
rejectReason: params.rejectReason,
message: messages[params.status],
},
});
}
/**
*
*/
export function pushEarningsAdded(params: {
userId: string;
orderId: string;
orderAmount: number;
commission: number;
visitorNickname?: string;
}): void {
pushMessage({
type: 'earnings_added',
userId: params.userId,
data: {
orderId: params.orderId,
orderAmount: params.orderAmount,
commission: params.commission,
visitorNickname: params.visitorNickname,
message: `收益 +¥${params.commission.toFixed(2)}`,
},
});
}
/**
*
*/
export function pushSystemNotice(params: {
userId: string;
title: string;
content: string;
link?: string;
}): void {
pushMessage({
type: 'system_notice',
userId: params.userId,
data: {
title: params.title,
content: params.content,
link: params.link,
},
});
}
/**
* WebSocket HookReact组件
* 使
*/
export function createWebSocketClient(userId: string, onMessage: (message: WebSocketMessage) => void) {
let lastTimestamp = new Date().toISOString();
let isRunning = false;
let intervalId: NodeJS.Timeout | null = null;
const fetchMessages = async () => {
if (!isRunning) return;
try {
const response = await fetch(`/api/distribution/messages?userId=${userId}&since=${encodeURIComponent(lastTimestamp)}`);
if (!response.ok) return;
const data = await response.json();
if (data.success && data.messages?.length > 0) {
for (const message of data.messages) {
onMessage(message);
if (message.timestamp > lastTimestamp) {
lastTimestamp = message.timestamp;
}
}
}
} catch (error) {
console.error('[WebSocketClient] 获取消息失败:', error);
}
};
return {
connect: () => {
isRunning = true;
// 每3秒轮询一次
intervalId = setInterval(fetchMessages, 3000);
// 立即获取一次
fetchMessages();
console.log('[WebSocketClient] 已连接,用户:', userId);
},
disconnect: () => {
isRunning = false;
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
console.log('[WebSocketClient] 已断开连接');
},
isConnected: () => isRunning,
};
}