From 4954492127094d7703bd0870d18a210b56160c1b Mon Sep 17 00:00:00 2001 From: xavier Date: Thu, 29 May 2025 16:30:47 +0800 Subject: [PATCH] update task --- Server/application/common/TaskServer.php | 163 +------- .../Adapters/ChuKeBao/Adapter.php | 356 +++++++++++------- 2 files changed, 235 insertions(+), 284 deletions(-) diff --git a/Server/application/common/TaskServer.php b/Server/application/common/TaskServer.php index 2fb2aaad..254d1091 100644 --- a/Server/application/common/TaskServer.php +++ b/Server/application/common/TaskServer.php @@ -46,171 +46,22 @@ class TaskServer extends Server $adapter = new ChuKeBaoAdapter(); - - // 只在一个进程里开这个定时器,处理指定任务 + // 在一个进程里处理获客任务添加后的相关逻辑 if ($current_worker_id == self::PROCESS_COUNT - 1) { - - // todo 封装为 handleFriendAddTaskWithStatusIsCreated() ; 重复代码进一步抽象 Timer::add(60, function () use($adapter) { - - $tasks = Db::name('task_customer') - ->where('status', 1) - ->limit(50) - ->select(); - - - if ($tasks) { - foreach ($tasks as $task) { - - $task_id = $task['task_id']; - - $task_info = $adapter->getCustomerAcquisitionTask($task_id); - - if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) { - continue; - } - - if (empty($task['processed_wechat_ids'])) { - continue; - } - - $weChatIds = explode(',', $task['processed_wechat_ids']); - - $passedWeChatId = ''; - - foreach ($weChatIds as $wechatId) { - - // 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3 - // 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?) - if ($adapter->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) { - $passedWeChatId = $wechatId; - break; - } - - } - - if ($passedWeChatId && !empty($task_info['msgConf'])) { - - // 直接发消息,同时更新状态为 4(已通过-已发消息) - $wechatFriendRecord = $adapter->getWeChatAccoutIdAndFriendIdByWeChatIdAndFriendPhone($passedWeChatId, $task['phone']); - - $msgConf = is_string($task_info['msgConf']) ? json_decode($task_info['msgConf'], 1) : $task_info['msgConf']; - - $wechatFriendRecord && $adapter->sendMsgToFriend($wechatFriendRecord['id'], $wechatFriendRecord['wechatAccountId'], $msgConf); - - - Db::name('task_customer') - ->where('id', $task['id']) - ->update(['status' => 4, 'updated_at' => time()]); - - } else { - - foreach ($weChatIds as $wechatId) { - - // 查询执行状态 - $latestFriendTask = $adapter->getLatestFriendTaskByPhoneAndWeChatId($task['phone'], $wechatId); - if (empty($latestFriendTask)) { - continue; - } - - - // 已经执行成功的话,直接break,同时更新对应task_customer的状态为2(添加成功) - if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 1) { - Db::name('task_customer') - ->where('id', $task['id']) - ->update(['status' => 2, 'updated_at' => time()]); - break; - } - - // todo 判断处理执行失败的情况 status=2,根据 extra 的描述去处理;-- 可以先直接更新为失败,然后 extra =》fail_reason -- 因为有专门的任务会处理失败的 - if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 2) { - Db::name('task_customer') - ->where('id', $task['id']) - ->update(['status' => 3, 'fail_reason' => $latestFriendTask['extra'] ?? '未知原因', 'updated_at' => time()]); - break; - } - - - } - - } - - - } - } + $adapter->handleCustomerTaskWithStatusIsCreated(); }); } + // 3个进程处理获客新任务 if ($current_worker_id < self::PROCESS_COUNT - 1) { - Timer::add(1, function () use ($current_worker_id, $process_count_for_status_0, $adapter) { - - $tasks = Db::name('task_customer') - ->where('status', 0) - ->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}") - ->limit(50) - ->select(); - if ($tasks) { - - foreach ($tasks as $task) { - - $task_id = $task['task_id']; - - $task_info = $adapter->getCustomerAcquisitionTask($task_id); - - if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) { - continue; - } - - - $wechatIdAccountIdMap = $adapter->getWeChatIdsAccountIdsMapByDeviceIds($task_info['reqConf']['devices']); - if (empty($wechatIdAccountIdMap)) { - continue; - } - - $friendAddTaskCreated = false; - - foreach ($wechatIdAccountIdMap as $wechatId => $accountId) { - - - // 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑 - if ($adapter->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) { - $task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号 - break; - } - - // 判断时间间隔\时间段和最后一次的状态 - $canCreateFriendAddTask = $adapter->checkIfCanCreateFriendAddTask($wechatId, $task_info['reqConf']); - if (empty($canCreateFriendAddTask)) { - continue; - } - - // 判断24h内加的好友数量,friend_task 先固定10个人 getLast24hAddedFriendsCount - $last24hAddedFriendsCount = $adapter->getLast24hAddedFriendsCount($wechatId); - if ($last24hAddedFriendsCount >= 10) { - continue; - } - - // 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断 - - // 创建好友添加任务, 对接触客宝 - $conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name']]); - $adapter->createFriendAddTask($accountId, $task['phone'], $conf); - $friendAddTaskCreated = true; - $task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号 - - break; - } - - Db::name('task_customer') - ->where('id', $task['id']) - ->update(['status' => $friendAddTaskCreated ? 1 : 3, 'fail_reason' => $friendAddTaskCreated ? '' : '所有账号不可添加', 'updated_at' => time()]); // ~~不用管,回头再添加再判断即可~~ - // 失败一定是另一个进程/定时器在检查的 - - } - } + $adapter->handleCustomerTaskWithStatusIsNew($current_worker_id, $process_count_for_status_0); }); } + + // 更多其他后台任务 + // ...... } } diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index 0762aed1..a9b0c234 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -16,6 +16,7 @@ use think\facade\Log; use app\api\controller\FriendTaskController; use app\common\service\AuthService; use app\api\controller\WebSocketController; +use Workerman\Lib\Timer; class Adapter implements WeChatServiceInterface { @@ -134,6 +135,7 @@ class Adapter implements WeChatServiceInterface ]; } + /** * 发送微信朋友圈 * @param string $deviceId 设备ID @@ -147,10 +149,161 @@ class Adapter implements WeChatServiceInterface return true; } - // sendMsgToFriend 要处理计划任务 + public function handleCustomerTaskWithStatusIsNew(int $current_worker_id, int $process_count_for_status_0) + { + $tasks = Db::name('task_customer') + ->where('status', 0) + ->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}") + ->limit(50) + ->select(); + if ($tasks) { + + foreach ($tasks as $task) { + + $task_id = $task['task_id']; + + $task_info = $this->getCustomerAcquisitionTask($task_id); + + if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) { + continue; + } + + + $wechatIdAccountIdMap = $this->getWeChatIdsAccountIdsMapByDeviceIds($task_info['reqConf']['devices']); + if (empty($wechatIdAccountIdMap)) { + continue; + } + + $friendAddTaskCreated = false; + + foreach ($wechatIdAccountIdMap as $wechatId => $accountId) { + + + // 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑 + if ($this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) { + $task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号 + break; + } + + // 判断时间间隔\时间段和最后一次的状态 + $canCreateFriendAddTask = $this->checkIfCanCreateFriendAddTask($wechatId, $task_info['reqConf']); + if (empty($canCreateFriendAddTask)) { + continue; + } + + // 判断24h内加的好友数量,friend_task 先固定10个人 getLast24hAddedFriendsCount + $last24hAddedFriendsCount = $this->getLast24hAddedFriendsCount($wechatId); + if ($last24hAddedFriendsCount >= 10) { + continue; + } + + // 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断 + + // 创建好友添加任务, 对接触客宝 + $conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name']]); + $this->createFriendAddTask($accountId, $task['phone'], $conf); + $friendAddTaskCreated = true; + $task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号 + + break; + } + + Db::name('task_customer') + ->where('id', $task['id']) + ->update(['status' => $friendAddTaskCreated ? 1 : 3, 'fail_reason' => $friendAddTaskCreated ? '' : '所有账号不可添加', 'updated_at' => time()]); // ~~不用管,回头再添加再判断即可~~ + // 失败一定是另一个进程/定时器在检查的 + + } + } + } + + + // 处理添加中的获客任务, only run in workerman process! + public function handleCustomerTaskWithStatusIsCreated() + { + + $tasks = Db::name('task_customer') + ->where('status', 1) + ->limit(50) + ->select(); + + if (empty($tasks)) { + return; + } + + foreach ($tasks as $task) { + + $task_id = $task['task_id']; + + $task_info = $this->getCustomerAcquisitionTask($task_id); + + if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) { + continue; + } + + if (empty($task['processed_wechat_ids'])) { + continue; + } + + $weChatIds = explode(',', $task['processed_wechat_ids']); + + $passedWeChatId = ''; + + foreach ($weChatIds as $wechatId) { + + // 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3 + // 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?) + if ($this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) { + $passedWeChatId = $wechatId; + break; + } + } + + if ($passedWeChatId && !empty($task_info['msgConf'])) { + + Db::name('task_customer') + ->where('id', $task['id']) + ->update(['status' => 4, 'updated_at' => time()]); + + $wechatFriendRecord = $this->getWeChatAccoutIdAndFriendIdByWeChatIdAndFriendPhone($passedWeChatId, $task['phone']); + + $msgConf = is_string($task_info['msgConf']) ? json_decode($task_info['msgConf'], 1) : $task_info['msgConf']; + + $wechatFriendRecord && $this->sendMsgToFriend($wechatFriendRecord['id'], $wechatFriendRecord['wechatAccountId'], $msgConf); + + } else { + + foreach ($weChatIds as $wechatId) { + + // 查询执行状态 + $latestFriendTask = $this->getLatestFriendTaskByPhoneAndWeChatId($task['phone'], $wechatId); + if (empty($latestFriendTask)) { + continue; + } + + // 已经执行成功的话,直接break,同时更新对应task_customer的状态为2(添加成功) + if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 1) { + Db::name('task_customer') + ->where('id', $task['id']) + ->update(['status' => 2, 'updated_at' => time()]); + break; + } + + // todo 判断处理执行失败的情况 status=2,根据 extra 的描述去处理;-- 可以先直接更新为失败,然后 extra =》fail_reason -- 因为有专门的任务会处理失败的 + if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 2) { + Db::name('task_customer') + ->where('id', $task['id']) + ->update(['status' => 3, 'fail_reason' => $latestFriendTask['extra'] ?? '未知原因', 'updated_at' => time()]); + break; + } + } + } + } + } + + // 发微信个人消息 public function sendMsgToFriend(int $friendId, int $wechatAccountId, array $msgConf) { - // todo 直接发消息,同时更新状态为 4(已通过-已发消息) application/api/controller/WebSocketController.php sendPersonal // 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件) // 当前,type 为文本、图片、动图表情包的时候,content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''} // $result = [ @@ -162,18 +315,58 @@ class Adapter implements WeChatServiceInterface // "wechatChatroomId" => 0, // "wechatFriendId" => $dataArray['wechatFriendId'], // ]; - $wsController = new WebSocketController(); - $wsController->sendPersonal([ - 'wechatFriendId' => $friendId, - 'wechatAccountId' => $wechatAccountId, - 'msgConf' => $msgConf, + $wsController = new WebSocketController([ + 'userName' => $this->config['username'], + 'password' => $this->config['password'] ]); + $gap = 0; + foreach ($msgConf['content'] as $content) { + $msgType = 0; + + if ($content['type'] == 'text') { + $msgType = 1; + } + if ($content['type'] == 'image') { + $msgType = 3; + } + if ($content['type'] == 'sticker') { + $msgType = 47; + } + if ($content['type'] == 'video') { + $msgType = 43; + } + + if ($msgType == 0) { + $msgType = 49; + } + + if ($gap) { + Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content) { + $wsController->sendPersonal([ + 'wechatFriendId' => $friendId, + 'wechatAccountId' => $wechatAccountId, + 'msgType' => $msgType, + 'content' => $content['detail'], + ]); + }, [], false); + } else { + $wsController->sendPersonal([ + 'wechatFriendId' => $friendId, + 'wechatAccountId' => $wechatAccountId, + 'msgType' => $msgType, + 'content' => $content['detail'], + ]); + } + + !empty($msgConf['send_gap']) && $gap += $msgConf['send_gap']; + } } // getCustomerAcquisitionTask - public function getCustomerAcquisitionTask($id) { + public function getCustomerAcquisitionTask($id) + { // 先读取缓存 $task_info = cache('task_info_' . $id); @@ -196,38 +389,19 @@ class Adapter implements WeChatServiceInterface public function checkIfIsWeChatFriendByPhone(string $wxId, string $phone): bool { if (empty($wxId) || empty($phone)) { - // Avoid queries with empty essential parameters. return false; } try { - // The SQL hint provided is: - // SELECT ownerWechatId, phone, passTime, createTime - // FROM `s2_wechat_friend` - // WHERE ownerWechatId = '您的微信ID' -- Corresponds to $wxId - // AND phone LIKE 'phone%' -- Corresponds to $phone . '%' - // ORDER BY createTime DESC; - - // $friendRecord = Db::table('s2_wechat_friend') - // ->where('ownerWechatId', $wxId) - // ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone - // ->order('createTime', 'desc') // Order by creation time as hinted - // ->find(); // Fetches the first matching record or null - // $friendRecord = Db::table('s2_wechat_friend') $id = Db::table('s2_wechat_friend') ->where('ownerWechatId', $wxId) - ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone - ->order('createTime', 'desc') // Order by creation time as hinted - // ->column('id'); + ->where('phone', 'like', $phone . '%') + ->order('createTime', 'desc') ->value('id'); - // If a record is found, $friendRecord will not be empty. - // return !empty($friendRecord); return (bool)$id; } catch (\Exception $e) { - // Log the exception for diagnostics. Log::error("Error in checkIfIsWeChatFriendByPhone (wxId: {$wxId}, phone: {$phone}): " . $e->getMessage()); - // Return false in case of an error, indicating not a friend or unable to determine. return false; } } @@ -240,11 +414,10 @@ class Adapter implements WeChatServiceInterface } return Db::table('s2_wechat_friend') - ->where('ownerWechatId', $wechatId) - ->where('phone', 'like', $phone . '%') - ->field('id,wechatAccountId,passTime,createTime') - ->find(); - + ->where('ownerWechatId', $wechatId) + ->where('phone', 'like', $phone . '%') + ->field('id,wechatAccountId,passTime,createTime') + ->find(); } // 判断是否已添加某手机号为好友并返回添加时间 @@ -255,15 +428,10 @@ class Adapter implements WeChatServiceInterface } try { - // $passTime = Db::table('s2_wechat_friend') - // ->where('ownerWechatId', $wxId) - // ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone - // ->order('createTime', 'desc') // Order by creation time as hinted - // ->value('passTime'); $record = Db::table('s2_wechat_friend') ->where('ownerWechatId', $wxId) - ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone - ->field('id,createTime,passTime') // Order by creation time as hinted + ->where('phone', 'like', $phone . '%') + ->field('id,createTime,passTime') ->find(); return $record['passTime'] ?? $record['createTime'] ?? 0; @@ -340,7 +508,7 @@ class Adapter implements WeChatServiceInterface } } - // getLatestFriendTaskByPhoneAndWeChatId + // 获取某微信最后一条添加好友任务 public function getLatestFriendTaskByPhoneAndWeChatId(string $phone, string $wechatId): array { if (empty($phone) || empty($wechatId)) { @@ -384,13 +552,11 @@ class Adapter implements WeChatServiceInterface return false; } - // conf['allow_add_time_between'] if (!empty($conf['allow_add_time_between']) && count($conf['allow_add_time_between']) == 2) { $currentTime = date('H:i'); $startTime = $conf['allow_add_time_between'][0]; $endTime = $conf['allow_add_time_between'][1]; - - // If current time is NOT between start and end time, return false + if ($currentTime >= $startTime && $currentTime <= $endTime) { return true; } else { @@ -399,36 +565,23 @@ class Adapter implements WeChatServiceInterface } if (isset($record['status'])) { - // if ($record['status'] == 1) { - // return true; - // } if ($record['status'] == 2) { - // todo 判断$record['extra'] 是否包含文字: 操作过于频繁;如果包含判断 updateTime 是否已经超过72min,updateTime是10位时间戳;如果包含指定文字且时间未超过72min,return false + // 判断$record['extra'] 是否包含文字: 操作过于频繁;如果包含判断 updateTime 是否已经超过72min,updateTime是10位时间戳;如果包含指定文字且时间未超过72min,return false if (isset($record['extra']) && strpos($record['extra'], '操作过于频繁') !== false) { $updateTime = isset($record['updateTime']) ? (int)$record['updateTime'] : 0; $now = time(); $diff = $now - $updateTime; - - // Check if less than 72 minutes (72 * 60 = 4320 seconds) have passed - // if ($diff < 72 * 60) { + if ($diff < 24 * 60 * 60) { return false; } } - } } - // $createTime = $record['createTime']; - // $now = time(); - // $diff = $now - $createTime; - // if ($diff > 10 * 60) { - // return true; - // } - - return true; + return true; } // 获取触客宝系统的客服微信账号id,用于后续微信相关操作 @@ -454,16 +607,12 @@ class Adapter implements WeChatServiceInterface ->where('deviceAlive', 1) ->where('wechatAlive', 1) ->where('wechatId', 'in', $wechatIds) - // ->field('id') ->field('id,wechatId') - // ->select(); ->column('id', 'wechatId'); return $records; } - // getWeChatIdsByDeviceIds - // public function getWeChatIdsByDeviceIds(array $deviceIds): array public function getWeChatIdsAccountIdsMapByDeviceIds(array $deviceIds): array { if (empty($deviceIds)) { @@ -472,16 +621,15 @@ class Adapter implements WeChatServiceInterface $records = Db::table('s2_wechat_account') ->where('deviceAlive', 1) ->where('currentDeviceId', 'in', $deviceIds) - // ->field('id,wechatId,currentDeviceId') ->field('id,wechatId') - // ->select(); ->column('id,wechatId'); return $records; } - // addFriendTaskApi - public function addFriendTaskApi(int $wechatAccountId, string $phone, string $message, string $remark, array $labels, $authorization = '') { - + // 触客宝添加好友API + public function addFriendTaskApi(int $wechatAccountId, string $phone, string $message, string $remark, array $labels, $authorization = '') + { + $authorization = $authorization ?: AuthService::getSystemAuthorization(); if (empty($authorization)) { @@ -492,10 +640,6 @@ class Adapter implements WeChatServiceInterface ]; } - // $friendTaskController = new FriendTaskController(); - // $friendTaskController->addFriendTask($wechatAccountId, $phone, $reqConf); - - // todo 调用 application/api/controller/FriendTaskController.php: addFriendTask() $params = [ 'phone' => $phone, 'message' => $message, @@ -513,31 +657,24 @@ class Adapter implements WeChatServiceInterface 'Content-Type' => 'application/json', 'client' => 'system' ]; - // 如果有授权信息,添加到请求头 - // if (!empty($authorization)) { - // $headers['Authorization'] = 'bearer ' . $authorization; - // } + $headers['Authorization'] = 'bearer ' . $authorization; try { - // 发送请求 $response = $client->request('POST', 'api/AddFriendByPhoneTask/add', [ 'headers' => $headers, - 'json' => $params, // Guzzle 会自动将数组转换为 JSON + 'json' => $params, ]); - - // 获取状态码 + $statusCode = $response->getStatusCode(); - - // 获取响应体并解析 JSON + $body = $response->getBody()->getContents(); $result = json_decode($body, true); - + // 返回结果,包含状态码和响应体 return [ 'status_code' => $statusCode, 'body' => $result ]; - } catch (RequestException $e) { // 处理请求异常,可以获取错误响应 if ($e->hasResponse()) { @@ -550,7 +687,7 @@ class Adapter implements WeChatServiceInterface $body = $e->getResponse()->getBody()->getContents(); $result = json_decode($body, true); - + return [ 'status_code' => $statusCode, 'body' => $result, @@ -559,7 +696,7 @@ class Adapter implements WeChatServiceInterface } Log::error("Error in addFriendTaskApi (wechatAccountId: {$wechatAccountId}, phone: {$phone}, message: {$message}, remark: {$remark}, labels: " . json_encode($labels) . "): " . $e->getMessage()); - + // 没有响应的异常 return [ 'status_code' => 0, @@ -578,52 +715,34 @@ class Adapter implements WeChatServiceInterface } } - // createFriendAddTask $accountId, $task['phone'], $task_info['reqConf'] -hello_msg,remark_type - // public function createFriendAddTask(int $wechatAccountId, string $phone, array $conf): bool + // 创建添加好友任务/执行添加 public function createFriendAddTask(int $wechatAccountId, string $phone, array $conf) { if (empty($wechatAccountId) || empty($phone) || empty($conf)) { - // return false; return; } - // $remark = ''; - // if (isset($conf['remark_type']) && $conf['remark_type'] == 'phone') { - // $remark = $phone . '-' . $conf['task_name'] ?? '获客'; - // } else { - - // } $remark = $phone . '-' . $conf['task_name'] ?? '获客'; $tags = []; - if (!empty($conf['tags'])) { + if (!empty($conf['tags'])) { if (is_array($conf['tags'])) { $tags = $conf['tags']; - } + } if (strpos($conf['tags'], ',') !== false) { $tags = explode(',', $conf['tags']); } } - // $res = $this->addFriendTaskApi($wechatAccountId, $phone, $conf['hello_msg'] ?? '你好', $remark, $conf['tags'] ?? []); $this->addFriendTaskApi($wechatAccountId, $phone, $conf['hello_msg'] ?? '你好', $remark, $tags); - // if ($res['status_code']) { - - // } - - - - // return true; - } /* TODO: 以上方法待实现,基于/参考 application/api/controller/WebSocketController.php 去实现;以下同步脚本用的方法转移到其他类 */ // NOTE: run in background; 5min 同步一次 - // todo: 后续经过`s2_`表,直接对接三方的api去sync public function syncFriendship() { $sql = "INSERT INTO ck_wechat_friendship(id,wechatId,tags,memo,ownerWechatId,createTime,updateTime,deleteTime,companyId) @@ -742,25 +861,19 @@ class Adapter implements WeChatServiceInterface ->alias('a') ->join(['s2_device' => 'd'], 'd.imei = a.imei') ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') - // ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as updateTime') ->cursor(); - // $insertData = []; - // $batchSize = 500; // Insert in batches for better performance - foreach ($cursor as $item) { if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['companyId'])) { continue; } - // $exists = Db::connect()->table('ck_device_wechat_login') $exists = Db::table('ck_device_wechat_login') ->where('deviceId', $item['deviceId']) ->where('wechatId', $item['wechatId']) ->where('companyId', $item['companyId']) - // ->where('createTime', $item['createTime']) ->find(); if ($exists) { @@ -774,20 +887,8 @@ class Adapter implements WeChatServiceInterface Db::table('ck_device_wechat_login')->insert($item); } - // $insertData[] = $item; - - // if (count($insertData) >= $batchSize) { - // Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE - // $insertData = []; // Reset for next batch - // } - } - // Insert any remaining data - // if (!empty($insertData)) { - // Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE - // } - return true; } catch (\Exception $e) { Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); @@ -851,7 +952,6 @@ class Adapter implements WeChatServiceInterface } - /** * 同步/更新微信客服信息到ck_wechat_customer表 *