logJobStart($jobId, $queueLockKey); $workbenches = $this->getActiveWorkbenches(); if (empty($workbenches)) { $this->handleEmptyWorkbenches($job, $queueLockKey); return true; } $this->processWorkbenches($workbenches); $this->handleJobSuccess($job, $queueLockKey); return true; } catch (\Exception $e) { return $this->handleJobError($e, $job, $queueLockKey); } } protected function getActiveWorkbenches() { return Workbench::where([ ['status', '=', 1], ['isDel', '=', 0], ['type', '=', 5] ])->order('id DESC')->select(); } protected function processWorkbenches($workbenches) { foreach ($workbenches as $workbench) { try { $this->processSingleWorkbench($workbench); } catch (\Exception $e) { Log::error("处理流量分发工作台 {$workbench->id} 失败: " . $e->getMessage()); } } } protected function processSingleWorkbench($workbench) { $page = 1; $pageSize = 20; $config = WorkbenchTrafficConfig::where('workbenchId', $workbench->id)->find(); if (!$config) { Log::error("流量分发工作台 {$workbench->id} 配置获取失败"); return; } // 验证是否在流量分发时间范围内 if (!$this->isTimeRange($config) && $config['timeType'] == 2) { return; } // 获取当天未超额的可用账号 if(empty($config['account'])){ Log::error("流量分发工作台 {$workbench->id} 未配置分发的客服"); return; } $accountIds = json_decode($config['account'],true); $todayStart = strtotime(date('Y-m-d 00:00:00')); $todayEnd = strtotime(date('Y-m-d 23:59:59')); $accounts = Db::table('s2_company_account') ->alias('a') ->where(['a.departmentId' => $workbench->companyId, 'a.status' => 0]) ->whereIn('a.id',$accountIds) ->whereNotLike('a.userName', '%_offline%') ->whereNotLike('a.userName', '%_delete%') ->leftJoin('workbench_traffic_config_item wti', "wti.wechatAccountId = a.id AND wti.workbenchId = {$workbench->id} AND wti.createTime BETWEEN {$todayStart} AND {$todayEnd}") ->field('a.id,a.userName,a.realName,a.nickname,COUNT(wti.id) as todayCount') ->group('a.id') ->having('todayCount <= ' . $config['maxPerDay']) ->select(); $accountNum = count($accounts); if ($accountNum < 1) { Log::info("流量分发工作台 {$workbench->id} 可分配账号少于1个"); return; } $automaticAssign = new AutomaticAssign(); do { $friends = $this->getFriendsByLabels($workbench, $config, $page, $pageSize); if (empty($friends) || count($friends) == 0) { Log::info("流量分发工作台 {$workbench->id} 没有可分配的好友"); break; } $i = 0; $accountNum = count($accounts); foreach ($friends as $friend) { if ($accountNum == 0) { Log::info("流量分发工作台 {$workbench->id} 所有账号今日分配已满"); break 2; } if ($i >= $accountNum) { $i = 0; } $account = $accounts[$i]; // 如果该账号今天分配的记录数加上本次分配的记录数超过最大限制 if (($account['todayCount'] + $pageSize) >= $config['maxPerDay']) { // 查询该客服账号当天分配记录数 $todayCount = Db::name('workbench_traffic_config_item') ->where('workbenchId', $workbench->id) ->where('wechatAccountId', $account['id']) ->whereBetween('createTime', [$todayStart, $todayEnd]) ->count(); if ($todayCount >= $config['maxPerDay']) { unset($accounts[$i]); $accounts = array_values($accounts); $accountNum = count($accounts); $i++; continue; } } // 执行切换好友命令 $res = $automaticAssign->allotWechatFriend([ 'wechatFriendId' => $friend['id'], 'toAccountId' => $account['id'] ], true); $res = json_decode($res,true); if ($res['code'] == 200){ Db::table('s2_wechat_friend') ->where('id',$friend['id']) ->update([ 'accountId' => $account['id'], 'accountUserName' => $account['userName'], 'accountRealName' => $account['realName'], 'accountNickname' => $account['nickname'], ]); // 写入分配记录表 Db::name('workbench_traffic_config_item')->insert([ 'workbenchId' => $workbench->id, 'deviceId' => $friend['deviceId'], 'wechatFriendId' => $friend['id'], 'wechatAccountId' => $account['id'], 'createTime' => time(), 'exp' => $config['exp'], 'expTime' => time() + 86400 * $config['exp'], ]); Log::info("流量分发工作台 {$workbench->id} 好友[{$friend['id']}]分配给客服[{$account['id']}] 成功"); } $i++; } break; $page++; } while (true); Log::info("流量分发工作台 {$workbench->id} 执行分发逻辑完成"); } /** * 检查是否在流量分发时间范围内 * @param WorkbenchAutoLike $config * @return bool */ protected function isTimeRange($config) { $currentTime = date('H:i'); if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) { Log::info("当前时间 {$currentTime} 不在流量分发时间范围内 ({$config['startTime']} - {$config['endTime']})"); return false; } return true; } /** * 一次性查出所有包含指定标签数组的好友(支持分页) * @param object $workbench 工作台对象 * @param object $config 配置对象 * @param int $page 页码 * @param int $pageSize 每页数量 * @return array */ protected function getFriendsByLabels($workbench, $config, $page = 1, $pageSize = 20) { $labels = []; if (!empty($config['pools'])) { $labels = is_array($config['pools']) ? $config['pools'] : json_decode($config['pools'], true); } $devices = []; if (!empty($config['devices'])) { $devices = is_array($config['devices']) ? $config['devices'] : json_decode($config['devices'], true); } if (empty($devices)) { return []; } $query = Db::table('s2_wechat_friend')->alias('wf') ->join(['s2_company_account' => 'sa'], 'sa.id = wf.accountId', 'left') ->join(['s2_wechat_account' => 'wa'], 'wa.id = wf.wechatAccountId', 'left') ->join('workbench_traffic_config_item wtci', 'wtci.isRecycle = 0 and wtci.wechatFriendId = wf.id AND wtci.workbenchId = ' . $config['workbenchId'], 'left') ->where([ ['wf.isDeleted', '=', 0], ['wf.isPassed', '=', 1], //['sa.departmentId', '=', $workbench->companyId], ['wtci.id', 'null', null] ]) ->whereIn('wa.currentDeviceId', $devices) ->field('wf.id,wf.wechatAccountId,wf.wechatId,wf.labels,sa.userName,wa.currentDeviceId as deviceId'); if(!empty($labels)){ $query->where(function ($q) use ($labels) { foreach ($labels as $label) { $q->whereOrRaw("JSON_CONTAINS(wf.labels, '\"{$label}\"')"); } }); } $list = $query->page($page, $pageSize)->order('wf.id DESC')->select(); return $list; } protected function logJobStart($jobId, $queueLockKey) { Log::info('开始处理流量分发任务: ' . json_encode([ 'jobId' => $jobId, 'queueLockKey' => $queueLockKey ])); } protected function handleJobSuccess($job, $queueLockKey) { $job->delete(); Cache::rm($queueLockKey); Log::info('流量分发任务执行成功'); } protected function handleJobError(\Exception $e, $job, $queueLockKey) { Log::error('流量分发任务异常:' . $e->getMessage()); if (!empty($queueLockKey)) { Cache::rm($queueLockKey); Log::info("由于异常释放队列锁: {$queueLockKey}"); } if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { $job->delete(); } else { $job->release(Config::get('queue.failed_delay', 10)); } return false; } protected function handleEmptyWorkbenches(Job $job, $queueLockKey) { Log::info('没有需要处理的流量分发任务'); $job->delete(); Cache::rm($queueLockKey); } }