logJobStart($jobId, $queueLockKey); $this->execute(); $this->handleJobSuccess($job, $queueLockKey); return true; } catch (\Exception $e) { return $this->handleJobError($e, $job, $queueLockKey); } } /** * 执行任务 * @throws \Exception */ public function execute() { try { // 获取所有工作台 $workbenches = Workbench::where(['status' => 1, 'type' => 3, 'isDel' => 0])->order('id desc')->select(); foreach ($workbenches as $workbench) { // 获取工作台配置 $config = WorkbenchGroupPush::where('workbenchId', $workbench->id)->find(); if (!$config) { continue; } //判断是否推送 $isPush = $this->isPush($workbench, $config); if (empty($isPush)) { continue; } // 获取内容库 $contentLibrary = $this->getContentLibrary($workbench, $config); if (empty($contentLibrary)) { continue; } // 处理内容发送 $this->sendMsgToGroup($workbench, $config, $contentLibrary); } } catch (\Exception $e) { Log::error("消息群发任务异常: " . $e->getMessage()); throw $e; } } // 发微信消息 public function sendMsgToGroup($workbench, $config, $msgConf) { // 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件) // 当前,type 为文本、图片、动图表情包的时候,content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''} // $result = [ // "content" => $dataArray['content'], // "msgSubType" => 0, // "msgType" => $dataArray['msgType'], // "seq" => time(), // "wechatAccountId" => $dataArray['wechatAccountId'], // "wechatChatroomId" => 0, // "wechatFriendId" => $dataArray['wechatFriendId'], // ]; $groups = json_decode($config['groups'], true); $groupsData = Db::name('wechat_group')->whereIn('id', $groups)->field('id,wechatAccountId,chatroomId,companyId,ownerWechatId')->select(); if (empty($groupsData)) { return false; } $toAccountId = ''; $username = Env::get('api.username', ''); $password = Env::get('api.password', ''); if (!empty($username) || !empty($password)) { $toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId'); } // 建立WebSocket $wsController = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]); foreach ($msgConf as $content) { $sendData = []; $sqlData = []; foreach ($groupsData as $groups) { // msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件) $sqlData[] = [ 'workbenchId' => $workbench['id'], 'contentId' => $content['id'], 'groupId' => $groups['id'], 'wechatAccountId' => $groups['wechatAccountId'], 'createTime' => time() ]; //内容 if (!empty($content['content'])) { //京东转链 if (!empty($config['promotionSiteId'])){ $WorkbenchController = new WorkbenchController(); $jdLink = $WorkbenchController->changeLink($content['content'],$config['promotionSiteId']); $jdLink = json_decode($jdLink, true); if($jdLink['code'] == 200){ $content['content'] = $jdLink['data']; } } $sendData[] = [ 'content' => $content['content'], 'msgType' => 1, 'wechatAccountId' => $groups['wechatAccountId'], 'wechatChatroomId' => $groups['id'], ]; } switch ($content['contentType']) { case 1: //图片解析 $imgs = json_decode($content['resUrls'], true); if (!empty($imgs)) { foreach ($imgs as $img) { $sendData[] = [ 'content' => $img, 'msgType' => 3, 'wechatAccountId' => $groups['wechatAccountId'], 'wechatChatroomId' => $groups['id'], ]; } } break; case 2: //链接解析 $url = json_decode($content['urls'], true); if (!empty($url[0])) { $url = $url[0]; $sendData[] = [ 'content' => [ 'desc' => '', 'thumbPath' => $url['image'], 'title' => $url['desc'], 'type' => 'link', 'url' => $url['url'], ], 'msgType' => 49, 'wechatAccountId' => $groups['wechatAccountId'], 'wechatChatroomId' => $groups['id'], ]; } break; case 3: //视频解析 $video = json_decode($content['urls'], true); if (!empty($video)) { $video = $video[0]; } $sendData[] = [ 'content' => $video, 'msgType' => 43, 'wechatAccountId' => $groups['wechatAccountId'], 'wechatChatroomId' => $groups['id'], ]; break; } if (empty($sendData)) { continue; } //发送消息 foreach ($sendData as $send) { $wsController->sendCommunity($send); } //插入发送记录 Db::name('workbench_group_push_item')->insertAll($sqlData); } } } /** * 记录发送历史 * @param Workbench $workbench * @param array $devices * @param array $contentLibrary */ protected function recordSendHistory($workbench, $devices, $contentLibrary) { $now = time(); $data = []; foreach ($devices as $device) { $data = [ 'workbenchId' => $workbench->id, 'deviceId' => $device['deviceId'], 'contentId' => $contentLibrary['id'], 'wechatAccountId' => $device['wechatAccountId'], 'createTime' => $now, ]; Db::name('workbench_group_push_item')->insert($data); } } /** * 获取设备列表 * @param Workbench $workbench 工作台 * @param WorkbenchGroupPush $config 配置 * @return array|bool */ protected function isPush($workbench, $config) { // 检查发送间隔(新逻辑:根据startTime、endTime、maxPerDay动态计算) $today = date('Y-m-d'); $startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00'); $endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00'); // 如果时间不符,则跳过 if (($startTimestamp > time() || $endTimestamp < time()) && empty($config['pushType'])) { return false; } $totalSeconds = $endTimestamp - $startTimestamp; if ($totalSeconds <= 0 || empty($config['maxPerDay'])) { return false; } $interval = floor($totalSeconds / $config['maxPerDay']); // 查询今日已同步次数 $count = Db::name('workbench_group_push_item') ->where('workbenchId', $workbench->id) ->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp]) ->count(); if ($count >= $config['maxPerDay']) { return false; } // 计算本次同步的最早允许时间 $nextSyncTime = $startTimestamp + $count * $interval; if (time() < $nextSyncTime) { return false; } return true; } /** * 获取内容库 * @param Workbench $workbench 工作台 * @param WorkbenchGroupPush $config 配置 * @return array|bool */ protected function getContentLibrary($workbench, $config) { $contentids = json_decode($config['contentLibraries'], true); if (empty($contentids)) { return false; } $limit = ($config['pushType'] == 1) ? 10 : 1; $order = ($config['pushOrder'] == 1) ? 'ci.sendTime desc, ci.id asc' : 'ci.sendTime desc, ci.id desc'; // 基础查询构建器 $baseQuery = function() use ($workbench, $contentids) { return Db::name('content_library')->alias('cl') ->join('content_item ci', 'ci.libraryId = cl.id') ->where(['cl.isDel' => 0, 'ci.isDel' => 0]) ->where('ci.sendTime <= ' . (time() + 60)) ->whereIn('cl.id', $contentids) ->field('ci.id,ci.libraryId,ci.contentType,ci.title,ci.content,ci.resUrls,ci.urls,ci.comment,ci.sendTime'); }; // 获取未发送的内容 $unsentContent = $baseQuery() ->join('workbench_group_push_item wgpi', 'wgpi.contentId = ci.id and wgpi.workbenchId = ' . $workbench->id, 'left') ->where('wgpi.id', 'null') ->order($order) ->limit($limit) ->select(); if (!empty($unsentContent)) { return $unsentContent; } // 如果不允许循环发送,直接返回空 if ($config['isLoop'] != 1) { return []; } // 循环发送逻辑:检查是否需要标记循环完成 $this->checkAndResetLoop($workbench->id, $contentids); // 获取下一个要发送的内容(从内容库中查询,排除isLoop为0的数据) $isPushIds = Db::name('workbench_group_push_item') ->where(['workbenchId' => $workbench->id,'isLoop' => 0]) ->column('contentId'); $nextContent = $baseQuery() ->whereNotIn('ci.id', $isPushIds) ->group('ci.id') ->order('ci.id asc') ->limit($limit) ->select(); return $nextContent; } /** * 检查循环状态 * @param int $workbenchId * @param array $contentids */ private function checkAndResetLoop($workbenchId, $contentids) { // 统计总内容数 $totalCount = Db::name('content_library')->alias('cl') ->join('content_item ci', 'ci.libraryId = cl.id') ->where(['cl.isDel' => 0, 'ci.isDel' => 0]) ->where('ci.sendTime <= ' . (time() + 60)) ->whereIn('cl.id', $contentids) ->count(); // 统计已发送内容数(排除isLoop为0的数据) $sentCount = Db::name('workbench_group_push_item') ->alias('wgpi') ->join('content_item ci', 'ci.id = wgpi.contentId') ->join('content_library cl', 'cl.id = ci.libraryId') ->where('wgpi.workbenchId', $workbenchId) ->where('wgpi.isLoop', 0) ->whereIn('cl.id', $contentids) ->count('DISTINCT wgpi.contentId'); // 记录循环状态 if ($sentCount >= $totalCount) { Db::name('workbench_group_push_item') ->where(['workbenchId' => $workbenchId, 'isLoop' => 0]) ->update(['isLoop' => 1]); } } /** * 记录任务开始 * @param string $jobId * @param string $queueLockKey */ protected function logJobStart($jobId, $queueLockKey) { Log::info('开始处理工作台消息群发任务: ' . json_encode([ 'jobId' => $jobId, 'queueLockKey' => $queueLockKey ])); } /** * 处理任务成功 * @param Job $job * @param string $queueLockKey */ protected function handleJobSuccess($job, $queueLockKey) { $job->delete(); Cache::rm($queueLockKey); Log::info('工作台消息群发任务执行成功'); } /** * 处理任务错误 * @param \Exception $e * @param Job $job * @param string $queueLockKey * @return bool */ protected function handleJobError(\Exception $e, $job, $queueLockKey) { Log::error('工作台消息群发任务异常:' . $e->getMessage()); if (!empty($queueLockKey)) { Cache::rm($queueLockKey); Log::info("由于异常释放队列锁: {$queueLockKey}"); } if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { $job->delete(); } else { $job->release(Config::get('queue.failed_delay', 10)); } return false; } }