Files
cunkebao_v3/Server/application/job/WorkbenchGroupPushJob.php

653 lines
23 KiB
PHP
Raw Normal View History

2025-08-07 11:56:04 +08:00
<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupPush;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
2025-09-02 11:24:23 +08:00
use app\cunkebao\controller\WorkbenchController;
2025-08-07 11:56:04 +08:00
/**
* 工作台消息群发任务
* Class WorkbenchGroupPushJob
* @package app\job
*/
class WorkbenchGroupPushJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
// 获取所有工作台
2025-11-07 15:25:50 +08:00
$workbenches = Workbench::where(['status' => 1, 'type' => 3, 'isDel' => 0,'id' => 256])->order('id desc')->select();
2025-08-07 11:56:04 +08:00
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupPush::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
//判断是否推送
$isPush = $this->isPush($workbench, $config);
if (empty($isPush)) {
continue;
}
// 获取内容库
$contentLibrary = $this->getContentLibrary($workbench, $config);
if (empty($contentLibrary)) {
continue;
}
// 处理内容发送
$this->sendMsgToGroup($workbench, $config, $contentLibrary);
}
} catch (\Exception $e) {
Log::error("消息群发任务异常: " . $e->getMessage());
throw $e;
}
}
2025-11-07 15:25:50 +08:00
// 发送消息(支持群推送和好友推送)
2025-08-07 11:56:04 +08:00
public function sendMsgToGroup($workbench, $config, $msgConf)
{
// 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
// 当前type 为文本、图片、动图表情包的时候content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''}
2025-11-07 15:25:50 +08:00
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
2025-08-07 11:56:04 +08:00
$toAccountId = '';
$username = Env::get('api.username', '');
$password = Env::get('api.password', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
// 建立WebSocket
$wsController = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
2025-11-07 15:25:50 +08:00
if ($targetType == 1) {
// 群推送
$this->sendToGroups($workbench, $config, $msgConf, $wsController);
} else {
// 好友推送
$this->sendToFriends($workbench, $config, $msgConf, $wsController);
}
}
/**
* 发送群消息
*/
protected function sendToGroups($workbench, $config, $msgConf, $wsController)
{
$groups = json_decode($config['groups'], true);
if (empty($groups)) {
return false;
}
$groupsData = Db::name('wechat_group')->whereIn('id', $groups)->field('id,wechatAccountId,chatroomId,companyId,ownerWechatId')->select();
if (empty($groupsData)) {
return false;
}
2025-08-07 11:56:04 +08:00
foreach ($msgConf as $content) {
$sendData = [];
$sqlData = [];
2025-11-07 15:25:50 +08:00
foreach ($groupsData as $group) {
2025-08-07 11:56:04 +08:00
// msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
$sqlData[] = [
'workbenchId' => $workbench['id'],
'contentId' => $content['id'],
2025-11-07 15:25:50 +08:00
'groupId' => $group['id'],
'friendId' => null,
'targetType' => 1,
'wechatAccountId' => $group['wechatAccountId'],
2025-08-07 11:56:04 +08:00
'createTime' => time()
];
2025-11-07 15:25:50 +08:00
// 构建发送数据
$sendData = $this->buildSendData($content, $config, $group['wechatAccountId'], $group['id'], 'group');
if (empty($sendData)) {
continue;
}
//发送消息
foreach ($sendData as $send) {
$wsController->sendCommunity($send);
}
//插入发送记录
Db::name('workbench_group_push_item')->insertAll($sqlData);
}
}
}
/**
* 发送好友消息
*/
protected function sendToFriends($workbench, $config, $msgConf, $wsController)
{
$friends = json_decode($config['friends'], true);
$devices = json_decode($config['devices'] ?? '[]', true);
// 如果好友列表为空,则根据设备查询所有好友
if (empty($friends)) {
if (empty($devices)) {
// 如果没有选择设备,则无法推送
Log::warning('好友推送:未选择设备,无法推送全部好友');
return false;
}
// 根据设备查询所有好友
$friendsData = Db::table('s2_company_account')
->alias('ca')
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
->join(['s2_wechat_friend' => 'wf'], 'wf.wechatAccountId = wa.id')
->where([
'ca.status' => 0,
'wf.isDeleted' => 0,
'wa.deviceAlive' => 1,
'wa.wechatAlive' => 1
])
->whereIn('wa.currentDeviceId', $devices)
->field('wf.id,wf.wechatAccountId,wf.wechatId,wf.ownerWechatId')
->group('wf.id')
->select();
} else {
// 查询指定的好友信息
$friendsData = Db::table('s2_wechat_friend')
->whereIn('id', $friends)
->where('isDeleted', 0)
->field('id,wechatAccountId,wechatId,ownerWechatId')
->select();
}
if (empty($friendsData)) {
return false;
}
// 获取所有已推送的好友ID列表去重不限制时间范围用于过滤避免重复推送
$sentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->column('friendId');
$sentFriendIds = array_filter($sentFriendIds); // 过滤null值
$sentFriendIds = array_unique($sentFriendIds); // 去重
// 获取今日已推送的好友ID列表用于计算今日推送人数
$today = date('Y-m-d');
$todayStartTimestamp = strtotime($today . ' 00:00:00');
$todayEndTimestamp = strtotime($today . ' 23:59:59');
$todaySentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->whereTime('createTime', 'between', [$todayStartTimestamp, $todayEndTimestamp])
->column('friendId');
$todaySentFriendIds = array_filter($todaySentFriendIds); // 过滤null值
$todaySentFriendIds = array_unique($todaySentFriendIds); // 去重
// 过滤掉所有已推送的好友(不限制时间范围,避免重复推送)
$friendsData = array_filter($friendsData, function($friend) use ($sentFriendIds) {
return !in_array($friend['id'], $sentFriendIds);
});
if (empty($friendsData)) {
Log::info('好友推送:所有好友都已推送过');
return false;
}
// 重新索引数组
$friendsData = array_values($friendsData);
// 计算剩余可推送人数(基于今日推送人数)
$todaySentCount = count($todaySentFriendIds);
$maxPerDay = intval($config['maxPerDay']);
$remainingCount = $maxPerDay - $todaySentCount;
if ($remainingCount <= 0) {
Log::info('好友推送:今日推送人数已达上限');
return false;
}
// 限制本次推送人数(不超过剩余可推送人数)
$friendsData = array_slice($friendsData, 0, $remainingCount);
// 批量处理每批最多500人
$batchSize = 500;
$batches = array_chunk($friendsData, $batchSize);
foreach ($msgConf as $content) {
foreach ($batches as $batchIndex => $batch) {
$sqlData = [];
foreach ($batch as $friend) {
// 构建发送数据
$sendData = $this->buildSendData($content, $config, $friend['wechatAccountId'], $friend['id'], 'friend');
if (empty($sendData)) {
continue;
}
// 发送个人消息
foreach ($sendData as $send) {
if ($send['msgType'] == 49){
$sendContent = json_encode($send['content'], 256);
} else {
$sendContent = $send['content'];
2025-09-02 11:24:23 +08:00
}
2025-11-07 15:25:50 +08:00
$wsController->sendPersonal([
'wechatFriendId' => $friend['id'],
'wechatAccountId' => $friend['wechatAccountId'],
'msgType' => $send['msgType'],
'content' => $sendContent,
]);
2025-09-02 11:24:23 +08:00
}
2025-11-07 15:25:50 +08:00
// 准备插入发送记录
$sqlData[] = [
'workbenchId' => $workbench['id'],
'contentId' => $content['id'],
'groupId' => null,
'friendId' => $friend['id'],
'targetType' => 2,
'wechatAccountId' => $friend['wechatAccountId'],
'createTime' => time()
2025-08-07 11:56:04 +08:00
];
}
2025-11-07 15:25:50 +08:00
// 批量插入发送记录
if (!empty($sqlData)) {
Db::name('workbench_group_push_item')->insertAll($sqlData);
Log::info("好友推送:第" . ($batchIndex + 1) . "批,推送了" . count($sqlData) . "个好友");
}
// 如果不是最后一批,等待一下再处理下一批(避免一次性推送太多)
if ($batchIndex < count($batches) - 1) {
sleep(1); // 等待1秒
}
}
}
}
/**
* 构建发送数据
*/
protected function buildSendData($content, $config, $wechatAccountId, $targetId, $type = 'group')
{
$sendData = [];
// 内容处理
if (!empty($content['content'])) {
// 京东转链
if (!empty($config['promotionSiteId'])) {
$WorkbenchController = new WorkbenchController();
$jdLink = $WorkbenchController->changeLink($content['content'], $config['promotionSiteId']);
$jdLink = json_decode($jdLink, true);
if ($jdLink['code'] == 200) {
$content['content'] = $jdLink['data'];
}
}
if ($type == 'group') {
$sendData[] = [
'content' => $content['content'],
'msgType' => 1,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $content['content'],
'msgType' => 1,
];
}
}
// 根据内容类型处理
switch ($content['contentType']) {
case 1:
// 图片解析
$imgs = json_decode($content['resUrls'], true);
if (!empty($imgs)) {
foreach ($imgs as $img) {
if ($type == 'group') {
2025-08-07 11:56:04 +08:00
$sendData[] = [
2025-11-07 15:25:50 +08:00
'content' => $img,
'msgType' => 3,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $img,
'msgType' => 3,
2025-08-07 11:56:04 +08:00
];
}
2025-11-07 15:25:50 +08:00
}
}
break;
case 2:
// 链接解析
$url = json_decode($content['urls'], true);
if (!empty($url[0])) {
$url = $url[0];
$linkContent = [
'desc' => $url['desc'],
'thumbPath' => $url['image'],
'title' => $url['desc'],
'type' => 'link',
'url' => $url['url'],
];
if ($type == 'group') {
2025-08-07 11:56:04 +08:00
$sendData[] = [
2025-11-07 15:25:50 +08:00
'content' => $linkContent,
'msgType' => 49,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
2025-08-07 11:56:04 +08:00
];
2025-11-07 15:25:50 +08:00
} else {
$sendData[] = [
'content' => $linkContent,
'msgType' => 49,
];
}
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
break;
case 3:
// 视频解析
$video = json_decode($content['resUrls'], true);
if (!empty($video)) {
$video = $video[0];
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
if ($type == 'group') {
$sendData[] = [
'content' => $video,
'msgType' => 43,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $video,
'msgType' => 43,
];
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
break;
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
return $sendData;
2025-08-07 11:56:04 +08:00
}
/**
* 记录发送历史
* @param Workbench $workbench
* @param array $devices
* @param array $contentLibrary
*/
protected function recordSendHistory($workbench, $devices, $contentLibrary)
{
$now = time();
$data = [];
foreach ($devices as $device) {
$data = [
'workbenchId' => $workbench->id,
'deviceId' => $device['deviceId'],
'contentId' => $contentLibrary['id'],
'wechatAccountId' => $device['wechatAccountId'],
'createTime' => $now,
];
Db::name('workbench_group_push_item')->insert($data);
}
}
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isPush($workbench, $config)
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if (($startTimestamp > time() || $endTimestamp < time()) && empty($config['pushType'])) {
return false;
}
$totalSeconds = $endTimestamp - $startTimestamp;
if ($totalSeconds <= 0 || empty($config['maxPerDay'])) {
return false;
}
2025-11-07 15:25:50 +08:00
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
if ($targetType == 2) {
// 好友推送maxPerDay表示每日推送人数
// 查询今日已推送的好友ID列表去重仅统计今日
$sentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->column('friendId');
$sentFriendIds = array_filter($sentFriendIds); // 过滤null值
$count = count(array_unique($sentFriendIds)); // 去重后统计今日推送人数
if ($count >= $config['maxPerDay']) {
return false;
}
2025-08-07 11:56:04 +08:00
2025-11-07 15:25:50 +08:00
// 计算本次同步的最早允许时间(按人数计算间隔)
$interval = floor($totalSeconds / $config['maxPerDay']);
$nextSyncTime = $startTimestamp + $count * $interval;
if (time() < $nextSyncTime) {
return false;
}
} else {
// 群推送maxPerDay表示每日推送次数
$interval = floor($totalSeconds / $config['maxPerDay']);
// 查询今日已同步次数
$count = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 1)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxPerDay']) {
return false;
}
2025-08-07 11:56:04 +08:00
2025-11-07 15:25:50 +08:00
// 计算本次同步的最早允许时间
$nextSyncTime = $startTimestamp + $count * $interval;
if (time() < $nextSyncTime) {
return false;
}
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
2025-08-07 11:56:04 +08:00
return true;
}
/**
* 获取内容库
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function getContentLibrary($workbench, $config)
{
$contentids = json_decode($config['contentLibraries'], true);
if (empty($contentids)) {
return false;
}
2025-11-07 15:25:50 +08:00
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
2025-09-04 10:49:22 +08:00
if ($config['pushType'] == 1) {
$limit = 10;
} else {
$limit = 1;
2025-09-03 14:34:26 +08:00
}
2025-08-07 11:56:04 +08:00
2025-09-04 10:49:22 +08:00
//推送顺序
if ($config['pushOrder'] == 1) {
$order = 'ci.sendTime desc, ci.id asc';
} else {
$order = 'ci.sendTime desc, ci.id desc';
2025-08-07 11:56:04 +08:00
}
2025-11-07 15:25:50 +08:00
// 基础查询根据targetType过滤记录
2025-09-04 10:49:22 +08:00
$query = Db::name('content_library')->alias('cl')
2025-08-07 11:56:04 +08:00
->join('content_item ci', 'ci.libraryId = cl.id')
2025-11-07 15:25:50 +08:00
->join('workbench_group_push_item wgpi', 'wgpi.contentId = ci.id and wgpi.workbenchId = ' . $workbench->id . ' and wgpi.targetType = ' . $targetType, 'left')
2025-09-03 14:34:26 +08:00
->where(['cl.isDel' => 0, 'ci.isDel' => 0])
2025-08-07 11:56:04 +08:00
->where('ci.sendTime <= ' . (time() + 60))
->whereIn('cl.id', $contentids)
2025-09-04 10:49:22 +08:00
->field([
'ci.id',
'ci.libraryId',
'ci.contentType',
'ci.title',
'ci.content',
'ci.resUrls',
'ci.urls',
'ci.comment',
'ci.sendTime'
]);
// 复制 query
$query2 = clone $query;
$query3 = clone $query;
2025-11-07 15:25:50 +08:00
// 根据isLoop处理不同的发送逻辑
2025-09-04 10:49:22 +08:00
if ($config['isLoop'] == 1) {
2025-11-07 15:25:50 +08:00
// 可以循环发送只有群推送时才能为1
2025-09-04 10:49:22 +08:00
// 1. 优先获取未发送的内容
$unsentContent = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
if (!empty($unsentContent)) {
return $unsentContent;
}
2025-11-07 15:25:50 +08:00
$lastSendData = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', $targetType)
->order('id desc')
->find();
$fastSendData = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', $targetType)
->order('id asc')
->find();
if (empty($lastSendData) || empty($fastSendData)) {
return [];
}
2025-08-07 11:56:04 +08:00
2025-09-04 10:49:22 +08:00
$sentContent = $query2->where('wgpi.contentId', '<', $lastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
2025-08-07 11:56:04 +08:00
2025-09-04 10:49:22 +08:00
if (empty($sentContent)) {
$sentContent = $query3->where('wgpi.contentId', '=', $fastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
}
return $sentContent;
} else {
2025-11-07 15:25:50 +08:00
// 不能循环发送只获取未发送的内容好友推送时isLoop=0
2025-09-04 10:49:22 +08:00
$list = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
return $list;
2025-08-07 11:56:04 +08:00
}
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
2025-09-04 10:49:22 +08:00
}