logJobStart($jobId, $queueLockKey); $this->execute(); $this->handleJobSuccess($job, $queueLockKey); return true; } catch (\Exception $e) { return $this->handleJobError($e, $job, $queueLockKey); } } /** * 成员类型常量 */ const MEMBER_TYPE_OWNER = 1; // 群主成员 const MEMBER_TYPE_ADMIN = 2; // 管理员 const MEMBER_TYPE_OWNER_FRIEND = 3; // 群主好友 const MEMBER_TYPE_ADMIN_FRIEND = 4; // 管理员好友 /** * 状态常量 */ const STATUS_PENDING = 0; // 待创建 const STATUS_CREATING = 1; // 创建中 const STATUS_SUCCESS = 2; // 创建成功 const STATUS_FAILED = 3; // 创建失败 const STATUS_ADMIN_FRIEND_ADDED = 4; // 管理员好友已拉入 /** * 执行任务 * @throws \Exception */ public function execute() { try { // 1. 查询启用了建群功能的数据 $workbenches = Workbench::where(['status' => 1, 'type' => 4, 'isDel' => 0])->order('id desc')->select(); foreach ($workbenches as $workbench) { // 获取工作台配置 $config = WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find(); if (!$config) { continue; } // 解析配置 $config['poolGroups'] = json_decode($config['poolGroups'], true); $config['devices'] = json_decode($config['devices'], true); $config['admins'] = json_decode($config['admins'] ?? '[]', true) ?: []; if (empty($config['poolGroups']) || empty($config['devices'])) { continue; } $groupMember = []; $wechatId = Db::name('device_wechat_login') ->whereIn('deviceId',$config['devices']) ->order('id desc') ->value('wechatId'); if (empty($wechatId)) { continue; } $groupMember[] = $wechatId; // 获取群主好友ID映射(所有群主的好友) $groupMemberWechatId = []; $groupMemberId = []; foreach ($groupMember as $ownerWechatId) { $friends = Db::table('s2_wechat_friend') ->where('ownerWechatId', $ownerWechatId) ->whereIn('wechatId', $groupMember) ->field('id,wechatId') ->select(); foreach ($friends as $friend) { if (!isset($groupMemberWechatId[$friend['id']])) { $groupMemberWechatId[$friend['id']] = $friend['wechatId']; $groupMemberId[] = $friend['id']; } } } if (empty($groupMemberWechatId)) { continue; } // 获取流量池用户 $poolItem = Db::name('traffic_source_package_item') ->whereIn('packageId', $config['poolGroups']) ->group('identifier') ->column('identifier'); if (empty($poolItem)) { continue; } // 获取已入群的用户(排除已成功入群的) $groupUser = Db::name('workbench_group_create_item') ->where('workbenchId', $workbench->id) ->where('status', 'in', [self::STATUS_SUCCESS, self::STATUS_ADMIN_FRIEND_ADDED]) ->whereIn('wechatId', $poolItem) ->group('wechatId') ->column('wechatId'); // 待入群的用户 $joinUser = array_diff($poolItem, $groupUser); if (empty($joinUser)) { continue; } // 计算随机群人数(不包含管理员,只减去群主成员数) $groupRandNum = mt_rand($config['groupSizeMin'], $config['groupSizeMax']) - count($groupMember); // 分批处理待入群用户 $addGroupUser = []; $totalRows = count($joinUser); for ($i = 0; $i < $totalRows; $i += $groupRandNum) { $batchRows = array_slice($joinUser, $i, $groupRandNum); if (!empty($batchRows)) { $addGroupUser[] = $batchRows; } } // 初始化WebSocket $toAccountId = ''; $username = Env::get('api.username2', ''); $password = Env::get('api.password2', ''); if (!empty($username) || !empty($password)) { $toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId'); } $webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]); // 遍历每批用户 foreach ($addGroupUser as $batchUsers) { $this->processBatchUsers($workbench, $config, $batchUsers, $groupMemberId, $groupMemberWechatId, $groupRandNum, $webSocket); } } } catch (\Exception $e) { Log::error("工作台建群任务异常: " . $e->getMessage()); throw $e; } } /** * 处理一批用户 * @param Workbench $workbench 工作台 * @param array $config 配置 * @param array $batchUsers 批次用户(微信ID数组,来自流量池) * @param array $groupMemberId 群主成员ID数组 * @param array $groupMemberWechatId 群主成员微信ID映射 * @param int $groupRandNum 随机群人数(不包含管理员) * @param WebSocketController $webSocket WebSocket实例 */ protected function processBatchUsers($workbench, $config, $batchUsers, $groupMemberId, $groupMemberWechatId, $groupRandNum, $webSocket) { // 1. 获取群主微信ID列表(用于验证管理员) // 从群主成员的好友记录中提取所有群主的微信ID(ownerWechatId) $groupOwnerWechatIds = []; foreach ($groupMemberId as $memberId) { $member = Db::table('s2_wechat_friend')->where('id', $memberId)->find(); if ($member && !in_array($member['ownerWechatId'], $groupOwnerWechatIds)) { $groupOwnerWechatIds[] = $member['ownerWechatId']; } } // 如果从好友表获取不到,使用群主成员微信ID列表(作为备用) if (empty($groupOwnerWechatIds)) { $groupOwnerWechatIds = array_values(array_unique($groupMemberWechatId)); } // 2. 验证并获取管理员好友ID(管理员必须是群主的好友) $adminFriendIds = []; $adminWechatIds = []; if (!empty($config['admins'])) { $adminFriends = Db::table('s2_wechat_friend') ->where('id', 'in', $config['admins']) ->field('id,wechatId,ownerWechatId') ->select(); foreach ($adminFriends as $adminFriend) { // 验证:管理员必须是群主的好友 if (in_array($adminFriend['ownerWechatId'], $groupOwnerWechatIds)) { $adminFriendIds[] = $adminFriend['id']; $adminWechatIds[$adminFriend['id']] = $adminFriend['wechatId']; } } } exit_data($adminWechatIds); // 3. 从流量池用户中筛选出是群主好友的用户(按微信账号分组) $ownerFriendIdsByAccount = []; $wechatIds = []; // 获取群主的好友关系(从流量池中筛选) $ownerFriends = Db::name('wechat_friendship')->alias('f') ->join(['s2_wechat_account' => 'a'], 'f.ownerWechatId=a.wechatId') ->where('f.companyId', $workbench->companyId) ->whereIn('f.wechatId', $batchUsers) ->whereIn('f.ownerWechatId', $groupOwnerWechatIds) ->field('f.id,f.wechatId,a.id as wechatAccountId') ->select(); if (empty($ownerFriends)) { Log::warning("未找到群主的好友,跳过。工作台ID: {$workbench->id}"); return; } // 按微信账号分组群主好友 foreach ($ownerFriends as $friend) { $wechatAccountId = $friend['wechatAccountId']; if (!isset($ownerFriendIdsByAccount[$wechatAccountId])) { $ownerFriendIdsByAccount[$wechatAccountId] = []; } $ownerFriendIdsByAccount[$wechatAccountId][] = $friend['id']; $wechatIds[$friend['id']] = $friend['wechatId']; } // 4. 遍历每个微信账号,创建群 foreach ($ownerFriendIdsByAccount as $wechatAccountId => $ownerFriendIds) { // 4.1 获取当前账号的管理员好友ID $currentAdminFriendIds = []; $accountWechatId = Db::table('s2_wechat_account')->where('id', $wechatAccountId)->value('wechatId'); foreach ($adminFriendIds as $adminFriendId) { $adminFriend = Db::table('s2_wechat_friend')->where('id', $adminFriendId)->find(); if ($adminFriend && $adminFriend['ownerWechatId'] == $accountWechatId) { $currentAdminFriendIds[] = $adminFriendId; $wechatIds[$adminFriendId] = $adminWechatIds[$adminFriendId]; } } // 4.2 获取当前账号的群主成员ID $currentGroupMemberIds = []; foreach ($groupMemberId as $memberId) { $member = Db::table('s2_wechat_friend')->where('id', $memberId)->find(); if ($member && $member['ownerWechatId'] == $accountWechatId) { $currentGroupMemberIds[] = $memberId; if (!isset($wechatIds[$memberId])) { $wechatIds[$memberId] = $groupMemberWechatId[$memberId] ?? ''; } } } // 4.3 限制群主好友数量(按随机群人数) $limitedOwnerFriendIds = array_slice($ownerFriendIds, 0, $groupRandNum); // 4.4 创建群:管理员 + 群主成员 + 群主好友(从流量池筛选) $createFriendIds = array_merge($currentAdminFriendIds, $currentGroupMemberIds, $limitedOwnerFriendIds); if (count($createFriendIds) < 2) { Log::warning("建群好友数量不足,跳过。工作台ID: {$workbench->id}, 微信账号ID: {$wechatAccountId}"); continue; } // 4.5 生成群名称 $existingGroupCount = Db::name('workbench_group_create_item') ->where('workbenchId', $workbench->id) ->where('wechatAccountId', $wechatAccountId) ->where('status', self::STATUS_SUCCESS) ->group('groupId') ->count(); $chatroomName = $existingGroupCount > 0 ? $config['groupNameTemplate'] . ($existingGroupCount + 1) . '群' : $config['groupNameTemplate']; // 4.6 调用建群接口 $createTime = time(); $createResult = $webSocket->CmdChatroomCreate([ 'chatroomName' => $chatroomName, 'wechatFriendIds' => $createFriendIds, 'wechatAccountId' => $wechatAccountId ]); $createResultData = json_decode($createResult, true); // 4.7 解析建群结果,获取群ID $chatroomId = 0; if (!empty($createResultData) && isset($createResultData['code']) && $createResultData['code'] == 200) { // 尝试从返回数据中获取群ID(根据实际API返回格式调整) if (isset($createResultData['data']['chatroomId'])) { $chatroomId = $createResultData['data']['chatroomId']; } elseif (isset($createResultData['data']['id'])) { $chatroomId = $createResultData['data']['id']; } } // 4.8 记录创建请求 $installData = []; foreach ($createFriendIds as $friendId) { $memberType = in_array($friendId, $currentAdminFriendIds) ? self::MEMBER_TYPE_ADMIN : (in_array($friendId, $currentGroupMemberIds) ? self::MEMBER_TYPE_OWNER : self::MEMBER_TYPE_OWNER_FRIEND); $installData[] = [ 'workbenchId' => $workbench->id, 'friendId' => $friendId, 'wechatId' => $wechatIds[$friendId] ?? ($groupMemberWechatId[$friendId] ?? ''), 'groupId' => $chatroomId, 'wechatAccountId' => $wechatAccountId, 'status' => $chatroomId > 0 ? self::STATUS_SUCCESS : self::STATUS_CREATING, 'memberType' => $memberType, 'retryCount' => 0, 'chatroomId' => $chatroomId > 0 ? $chatroomId : null, 'createTime' => $createTime, ]; } Db::name('workbench_group_create_item')->insertAll($installData); // 5. 如果群创建成功,拉管理员的好友进群 if ($chatroomId > 0 && !empty($currentAdminFriendIds)) { $this->inviteAdminFriends($workbench, $config, $batchUsers, $currentAdminFriendIds, $chatroomId, $wechatAccountId, $wechatIds, $createTime, $webSocket); } } } /** * 拉管理员的好友进群 * @param Workbench $workbench 工作台 * @param array $config 配置 * @param array $batchUsers 批次用户(流量池微信ID数组) * @param array $adminFriendIds 管理员好友ID数组 * @param int $chatroomId 群ID * @param int $wechatAccountId 微信账号ID * @param array $wechatIds 好友ID到微信ID的映射 * @param int $createTime 创建时间 * @param WebSocketController $webSocket WebSocket实例 */ protected function inviteAdminFriends($workbench, $config, $batchUsers, $adminFriendIds, $chatroomId, $wechatAccountId, $wechatIds, $createTime, $webSocket) { // 获取管理员的微信ID列表 $adminWechatIds = []; foreach ($adminFriendIds as $adminFriendId) { if (isset($wechatIds[$adminFriendId])) { $adminWechatIds[] = $wechatIds[$adminFriendId]; } } if (empty($adminWechatIds)) { return; } // 从流量池用户中筛选出是管理员好友的用户 $adminFriendsFromPool = Db::name('wechat_friendship')->alias('f') ->join(['s2_wechat_account' => 'a'], 'f.ownerWechatId=a.wechatId') ->where('f.companyId', $workbench->companyId) ->whereIn('f.wechatId', $batchUsers) ->whereIn('f.ownerWechatId', $adminWechatIds) ->where('a.id', $wechatAccountId) ->field('f.id,f.wechatId') ->select(); if (empty($adminFriendsFromPool)) { Log::info("未找到管理员的好友,跳过拉人。工作台ID: {$workbench->id}, 群ID: {$chatroomId}"); return; } // 提取好友ID列表 $adminFriendIdsToInvite = []; foreach ($adminFriendsFromPool as $friend) { $adminFriendIdsToInvite[] = $friend['id']; $wechatIds[$friend['id']] = $friend['wechatId']; } // 调用拉人接口 $inviteResult = $webSocket->CmdChatroomInvite([ 'wechatChatroomId' => $chatroomId, 'wechatFriendIds' => $adminFriendIdsToInvite ]); $inviteResultData = json_decode($inviteResult, true); $inviteSuccess = !empty($inviteResultData) && isset($inviteResultData['code']) && $inviteResultData['code'] == 200; // 记录管理员好友拉入状态 $adminFriendData = []; foreach ($adminFriendIdsToInvite as $friendId) { $adminFriendData[] = [ 'workbenchId' => $workbench->id, 'friendId' => $friendId, 'wechatId' => $wechatIds[$friendId] ?? '', 'groupId' => $chatroomId, 'wechatAccountId' => $wechatAccountId, 'status' => $inviteSuccess ? self::STATUS_ADMIN_FRIEND_ADDED : self::STATUS_FAILED, 'memberType' => self::MEMBER_TYPE_ADMIN_FRIEND, 'retryCount' => 0, 'chatroomId' => $chatroomId, 'createTime' => $createTime, ]; } Db::name('workbench_group_create_item')->insertAll($adminFriendData); if ($inviteSuccess) { Log::info("管理员好友拉入成功。工作台ID: {$workbench->id}, 群ID: {$chatroomId}, 拉入数量: " . count($adminFriendIdsToInvite)); } else { Log::warning("管理员好友拉入失败。工作台ID: {$workbench->id}, 群ID: {$chatroomId}"); } } /** * 获取设备列表 * @param Workbench $workbench 工作台 * @param WorkbenchGroupPush $config 配置 * @return array|bool */ protected function isCreate($workbench, $config, $groupIds = []) { // 检查发送间隔(新逻辑:根据startTime、endTime、maxPerDay动态计算) $today = date('Y-m-d'); $startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00'); $endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00'); // 如果时间不符,则跳过 if ($startTimestamp > time() || $endTimestamp < time() || empty($groupIds)) { return false; } // 查询今日建群数量 $count = Db::name('wechat_group') ->whereIn('id', $groupIds) ->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp]) ->count(); if ($count >= $config['maxGroupsPerDay']) { return false; } return true; } /** * 记录任务开始 * @param string $jobId * @param string $queueLockKey */ protected function logJobStart($jobId, $queueLockKey) { Log::info('开始处理工作台消息群发任务: ' . json_encode([ 'jobId' => $jobId, 'queueLockKey' => $queueLockKey ])); } /** * 处理任务成功 * @param Job $job * @param string $queueLockKey */ protected function handleJobSuccess($job, $queueLockKey) { $job->delete(); Cache::rm($queueLockKey); Log::info('工作台消息群发任务执行成功'); } /** * 处理任务错误 * @param \Exception $e * @param Job $job * @param string $queueLockKey * @return bool */ protected function handleJobError(\Exception $e, $job, $queueLockKey) { Log::error('工作台消息群发任务异常:' . $e->getMessage()); if (!empty($queueLockKey)) { Cache::rm($queueLockKey); Log::info("由于异常释放队列锁: {$queueLockKey}"); } if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { $job->delete(); } else { $job->release(Config::get('queue.failed_delay', 10)); } return false; } }