diff --git a/Server/application/common/TaskServer.php b/Server/application/common/TaskServer.php index fb3b57d2..5a6d2aa4 100644 --- a/Server/application/common/TaskServer.php +++ b/Server/application/common/TaskServer.php @@ -11,7 +11,7 @@ use WeChatDeviceApi\Adapters\ChuKeBao\Adapter as ChuKeBaoAdapter; class TaskServer extends Server { - const PROCESS_COUNT = 4; + const PROCESS_COUNT = 5; protected $socket = 'text://0.0.0.0:2980'; @@ -50,15 +50,23 @@ class TaskServer extends Server Log::info('Workerman进程:' . $current_worker_id); + // 在一个进程里处理获客任务新是数据 + if ($current_worker_id == 4) { + Timer::add(60, function () use($adapter) { + $adapter->handleCustomerTaskNewUser(); + }); + } + + // 在一个进程里处理获客任务添加后的相关逻辑 - if ($current_worker_id == self::PROCESS_COUNT - 1) { + if ($current_worker_id == 3) { Timer::add(60, function () use($adapter) { $adapter->handleCustomerTaskWithStatusIsCreated(); }); } // 3个进程处理获客新任务 - if ($current_worker_id < self::PROCESS_COUNT - 1) { + if ($current_worker_id < 3) { Timer::add(1, function () use ($current_worker_id, $process_count_for_status_0, $adapter) { $adapter->handleCustomerTaskWithStatusIsNew($current_worker_id, $process_count_for_status_0); }); diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index 5c4159ad..03d7c51d 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -6,6 +6,7 @@ use think\facade\Cache; use think\facade\Env; use WeChatDeviceApi\Contracts\WeChatServiceInterface; use WeChatDeviceApi\Exceptions\ApiException; + // 如果有 Client.php // use WeChatDeviceApi\Adapters\ChuKeBao\Client as ChuKeBaoApiClient; use GuzzleHttp\Client; @@ -23,6 +24,7 @@ use Workerman\Lib\Timer; class Adapter implements WeChatServiceInterface { protected $config; + // protected $apiClient; // 如果使用 VendorAApiClient public function __construct(array $config = []) @@ -154,8 +156,8 @@ class Adapter implements WeChatServiceInterface public function handleCustomerTaskWithStatusIsNew(int $current_worker_id, int $process_count_for_status_0) { $task = Db::name('customer_acquisition_task') - ->where(['status' => 1,'deleteTime' => 0]) -// ->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}") + ->where(['status' => 1, 'deleteTime' => 0]) + ->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}") ->order('id desc') ->select(); @@ -168,11 +170,11 @@ class Adapter implements WeChatServiceInterface $reqConf = json_decode($item['reqConf'], true); $device = $reqConf['device'] ?? []; $deviceCount = count($device); - if ($deviceCount <= 0){ + if ($deviceCount <= 0) { continue; } $tasks = Db::name('task_customer') - ->where(['status'=> 0,'task_id'=>$item['id']]) + ->where(['status' => 0, 'task_id' => $item['id']]) ->order('id DESC') ->limit($deviceCount) ->select(); @@ -198,7 +200,7 @@ class Adapter implements WeChatServiceInterface foreach ($wechatIdAccountIdMap as $accountId => $wechatId) { // 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑 $wechatTags = json_decode($task['tags'], true); - $isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'],$task['siteTags']); + $isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'], $task['siteTags']); if (!empty($isFriend)) { $friendAddTaskCreated = true; $task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号 @@ -220,13 +222,13 @@ class Adapter implements WeChatServiceInterface // 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断 // 创建好友添加任务, 对接触客宝 - $tags = array_merge($task_info['tagConf']['customTags'],$task_info['tagConf']['scenarioTags']); - if (!empty($wechatTags)){ - $tags = array_merge($tags,$wechatTags); + $tags = array_merge($task_info['tagConf']['customTags'], $task_info['tagConf']['scenarioTags']); + if (!empty($wechatTags)) { + $tags = array_merge($tags, $wechatTags); } $tags = array_unique($tags); $tags = array_values($tags); - $conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name'],'tags' => $tags]); + $conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name'], 'tags' => $tags]); $this->createFriendAddTask($accountId, $task['phone'], $conf); @@ -249,13 +251,12 @@ class Adapter implements WeChatServiceInterface } } - // 处理添加中的获客任务, only run in workerman process! public function handleCustomerTaskWithStatusIsCreated() { $tasks = Db::name('task_customer') - ->whereIn('status', [1,2]) + ->whereIn('status', [1, 2]) ->where('updateTime', '>=', (time() - 86400 * 3)) ->limit(50) ->order('updateTime DESC') @@ -280,20 +281,19 @@ class Adapter implements WeChatServiceInterface $weChatIds = explode(',', $task['processed_wechat_ids']); $passedWeChatId = ''; - foreach ($weChatIds as $wechatId) { - // 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3 // 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?) - $isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone']); - if ($isFriend) { - $passedWeChatId = $wechatId; - break; + if (!empty($wechatId)) { + $isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone']); + if ($isFriend) { + $passedWeChatId = $wechatId; + break; + } } } - if ($passedWeChatId && !empty($task_info['msgConf'])) { Db::name('task_customer') @@ -334,6 +334,104 @@ class Adapter implements WeChatServiceInterface } } + + public function handleCustomerTaskNewUser() + { + $task = Db::name('customer_acquisition_task') + ->where(['status' => 1, 'deleteTime' => 0]) + ->whereIn('sceneId', [7]) + ->order('id desc') + ->select(); + + if (empty($task)) { + return false; + } + + foreach ($task as $item) { + $sceneConf = json_decode($item['sceneConf'], true); + //群获客 + if ($item['sceneId'] == 7) { + if (!empty($sceneConf['groupSelected']) && is_array($sceneConf['groupSelected'])) { + $rows = Db::name('wechat_group_member')->alias('gm') + ->join('wechat_account wa', 'gm.identifier = wa.wechatId') + ->where('gm.companyId', $item['companyId']) + ->whereIn('gm.groupId', $sceneConf['groupSelected']) + ->group('gm.identifier') + ->column('wa.id,wa.wechatId,wa.alias,wa.phone'); + + + // 1000条为一组进行批量处理 + $batchSize = 1000; + $totalRows = count($rows); + + for ($i = 0; $i < $totalRows; $i += $batchSize) { + $batchRows = array_slice($rows, $i, $batchSize); + + if (!empty($batchRows)) { + // 1. 提取当前批次的phone + $phones = []; + foreach ($batchRows as $row) { + if (!empty($row['phone'])) { + $phone = !empty($row['phone']); + } elseif (!empty($row['alias'])) { + $phone = $row['alias']; + } else { + $phone = $row['wechatId']; + } + if (!empty($phone)) { + $phones[] = $phone; + } + } + + // 2. 批量查询已存在的phone + $existingPhones = []; + if (!empty($phones)) { + $existing = Db::name('task_customer') + ->where('task_id', $item['id']) + ->where('phone', 'in', $phones) + ->field('phone') + ->select(); + $existingPhones = array_column($existing, 'phone'); + } + + // 3. 过滤出新数据,批量插入 + $newData = []; + foreach ($batchRows as $row) { + if (!empty($row['phone'])) { + $phone = !empty($row['phone']); + } elseif (!empty($row['alias'])) { + $phone = $row['alias']; + } else { + $phone = $row['wechatId']; + } + if (!empty($phone) && !in_array($phone, $existingPhones)) { + $newData[] = [ + 'task_id' => $item['id'], + 'name' => '', + 'source' => '场景获客_' . $item['name'], + 'phone' => $phone, + 'tags' => json_encode([], JSON_UNESCAPED_UNICODE), + 'siteTags' => json_encode([], JSON_UNESCAPED_UNICODE), + 'createTime' => time(), + ]; + } + } + + // 4. 批量插入新数据 + if (!empty($newData)) { + Db::name('task_customer')->insertAll($newData); + } + } + } + } + } + + + exit_data($sceneConf); + } + } + + // 发微信个人消息 public function sendMsgToFriend(int $friendId, int $wechatAccountId, array $msgConf) { @@ -352,7 +450,7 @@ class Adapter implements WeChatServiceInterface $username = Env::get('api.username', ''); $password = Env::get('api.password', ''); if (!empty($username) || !empty($password)) { - $toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId'); + $toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId'); } // 建立WebSocket @@ -381,7 +479,7 @@ class Adapter implements WeChatServiceInterface case 'file': $msgType = 49; - + $detail = [ 'type' => 'file', 'title' => $content['content'][0]['name'], @@ -418,12 +516,12 @@ class Adapter implements WeChatServiceInterface } - if(empty($detail)){ + if (empty($detail)) { continue; } if ($gap) { - Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content,$detail) { + Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content, $detail) { $wsController->sendPersonal([ 'wechatFriendId' => $friendId, 'wechatAccountId' => $wechatAccountId, @@ -463,7 +561,8 @@ class Adapter implements WeChatServiceInterface } // 检查是否是好友关系 - public function checkIfIsWeChatFriendByPhone(string $wxId, string $phone,string $siteTags): bool + + public function checkIfIsWeChatFriendByPhone($wxId = '', $phone = '', $siteTags = '') { if (empty($wxId) || empty($phone)) { return false; @@ -472,7 +571,7 @@ class Adapter implements WeChatServiceInterface try { $friend = Db::table('s2_wechat_friend') ->where('ownerWechatId', $wxId) - ->where(['isPassed' => 1,'isDeleted' => 0]) + ->where(['isPassed' => 1, 'isDeleted' => 0]) ->where('phone|alias|wechatId', 'like', $phone . '%') ->order('createTime', 'desc') ->find(); @@ -480,17 +579,17 @@ class Adapter implements WeChatServiceInterface if (!empty($siteTags)) { $siteTags = json_decode($siteTags, true); $siteLabels = json_decode($friend['siteLabels'], true); - $tags = array_merge($siteTags,$siteLabels); + $tags = array_merge($siteTags, $siteLabels); $tags = array_unique($tags); $tags = array_values($tags); - if (empty($tags)){ + if (empty($tags)) { $tags = []; } - $tags = json_encode($tags,256); - Db::table('s2_wechat_friend')->where(['id' => $friend['id']])->update(['siteLabels' => $tags,'updateTime' => time()]); + $tags = json_encode($tags, 256); + Db::table('s2_wechat_friend')->where(['id' => $friend['id']])->update(['siteLabels' => $tags, 'updateTime' => time()]); } return true; - }else{ + } else { return false; } } catch (\Exception $e) { @@ -637,8 +736,7 @@ class Adapter implements WeChatServiceInterface } //强制请求添加好友的列表 $friendController = new FriendTaskController(); - $friendController->getlist(0,50); - + $friendController->getlist(0, 50); $record = $this->getLatestFriendTask($wechatId); @@ -751,9 +849,9 @@ class Adapter implements WeChatServiceInterface $friendController = new FriendTaskController(); $result = $friendController->addFriendTask($params); $result = json_decode($result, true); - if ($result['code'] == 200){ + if ($result['code'] == 200) { return $result; - }else{ + } else { $authorization = AuthService::getSystemAuthorization(false); return $this->addFriendTaskApi($wechatAccountId, $phone, $message, $remark, $labels, $authorization); } @@ -768,7 +866,7 @@ class Adapter implements WeChatServiceInterface return; } - switch ($conf['remarkType']){ + switch ($conf['remarkType']) { case 'phone': $remark = $phone . '-' . $conf['task_name']; break; @@ -776,7 +874,7 @@ class Adapter implements WeChatServiceInterface $remark = ''; break; case 'source': - $remark = $conf['task_name']; + $remark = $conf['task_name']; break; default: $remark = ''; @@ -955,7 +1053,7 @@ class Adapter implements WeChatServiceInterface * 大数据量分批处理版本 * 适用于数据源非常大的情况,避免一次性加载全部数据到内存 * 独立脚本执行,30min 同步一次 和 流量来源的更新一起 - * + * * @param int $batchSize 每批处理的数据量 * @return int 影响的行数 */ @@ -1009,7 +1107,7 @@ class Adapter implements WeChatServiceInterface /** * 同步/更新微信客服信息到ck_wechat_customer表 - * + * * @param int $batchSize 每批处理的数据量 * @return int 影响的行数 */ @@ -1151,7 +1249,7 @@ class Adapter implements WeChatServiceInterface /** * 计算客服权重 - * + * * @param array $basic 基础信息 * @param array $activity 活跃信息 * @param array $friendShip 好友关系信息 @@ -1213,7 +1311,7 @@ class Adapter implements WeChatServiceInterface /** * 同步设备信息到ck_device表 * 数据量不大,仅同步一次所有设备 - * + * * @return int 影响的行数 */ public function syncDevice() @@ -1390,5 +1488,4 @@ class Adapter implements WeChatServiceInterface } - }