From 9102ecbdffd431aa9cbc042d10fd0c5ee054141a Mon Sep 17 00:00:00 2001 From: wong <106998207@qq.com> Date: Tue, 20 May 2025 09:34:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=90=8C=E6=AD=A5=E6=89=80?= =?UTF-8?q?=E6=9C=89=E5=A5=BD=E5=8F=8B=E6=9C=8B=E5=8F=8B=E5=9C=88=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/controller/WebSocketController.php | 127 +++++++---- .../command/WechatMomentsCommand.php | 2 +- Server/application/job/WechatMomentsJob.php | 210 +++++++----------- Server/application/job/WorkbenchJob.php | 30 ++- 4 files changed, 177 insertions(+), 192 deletions(-) diff --git a/Server/application/api/controller/WebSocketController.php b/Server/application/api/controller/WebSocketController.php index 4f8fd2c4..c0cadba8 100644 --- a/Server/application/api/controller/WebSocketController.php +++ b/Server/application/api/controller/WebSocketController.php @@ -404,55 +404,81 @@ class WebSocketController extends BaseController /** * 获取指定账号朋友圈图片地址 - * @return \think\response\Json + * @param array $data 请求参数 + * @return string JSON响应 */ - public function getMomentSourceRealUrl() + public function getMomentSourceRealUrl($data = []) { - if ($this->request->isPost()) { - $data = $this->request->param(); - + try { + // 参数验证 if (empty($data)) { - return json_encode(['code'=>400,'msg'=>'参数缺失']); - } - $dataArray = $data; - if (!is_array($dataArray)) { - return json_encode(['code'=>400,'msg'=>'数据格式错误']); - } - //获取数据条数 -// $count = isset($dataArray['count']) ? $dataArray['count'] : 10; - //过滤消息 - if (empty($dataArray['wechatAccountId'])) { - return json_encode(['code'=>400,'msg'=>'指定账号不能为空']); - } - if (empty($dataArray['snsId'])) { - return json_encode(['code'=>400,'msg'=>'指定消息ID不能为空']); - } - if (empty($dataArray['snsUrls'])) { - return json_encode(['code'=>400,'msg'=>'资源信息不能为空']); - } - $msg = '获取朋友圈资源链接成功'; - $message = []; - try { - $params = [ - "cmdType" => $dataArray['type'], - "snsId" => $dataArray['snsId'], - "urls" => $dataArray['snsUrls'], - "wechatAccountId" => $dataArray['wechatAccountId'], - "seq" => time(), - ]; - $params = json_encode($params); - $this->client->send($params); - $message = $this->client->receive(); - //Log::write('WS获取朋友圈图片/视频链接成功,结果:' . json_encode($message, 256)); - //关闭WS链接 - $this->client->close(); - } catch (\Exception $e) { - $msg = $e->getMessage(); + return json_encode(['code' => 400, 'msg' => '参数缺失']); } - return json_encode(['code'=>200,'msg'=>$msg,'data'=>$message]); - } else { - return json_encode(['code'=>400,'msg'=>'非法请求']); + // 验证必要参数 + $requiredParams = ['snsId', 'snsUrls', 'wechatAccountId']; + foreach ($requiredParams as $param) { + if (empty($data[$param])) { + return json_encode(['code' => 400, 'msg' => "参数 {$param} 不能为空"]); + } + } + + // 验证snsUrls是否为数组 + if (!is_array($data['snsUrls'])) { + return json_encode(['code' => 400, 'msg' => '资源信息格式错误,应为数组']); + } + + // 检查连接状态 + if (!$this->isConnected) { + $this->connect(); + if (!$this->isConnected) { + return json_encode(['code' => 500, 'msg' => 'WebSocket连接失败']); + } + } + + // 构建请求参数 + $params = [ + "cmdType" => 'CmdDownloadMomentImagesResult', + "snsId" => $data['snsId'], + "urls" => $data['snsUrls'], + "wechatAccountId" => $data['wechatAccountId'], + "seq" => time(), + ]; + + // 记录请求日志 + Log::info('获取朋友圈资源链接请求:' . json_encode($params, 256)); + + // 发送请求 + $this->client->send(json_encode($params)); + + // 接收响应 + $response = $this->client->receive(); + $message = json_decode($response, true); + + if(empty($message)){ + return json_encode(['code'=>500,'msg'=>'获取朋友圈资源链接失败']); + } + if($message['cmdType'] == 'CmdDownloadMomentImagesResult' && is_array($message['urls']) && count($message['urls']) > 0){ + $urls = json_encode($message['urls'],256); + Db::table('s2_wechat_moments')->where('snsId',$data['snsId'])->update(['resUrls'=>$urls]); + } + return json_encode(['code'=>200,'msg'=>'获取朋友圈资源链接成功','data'=>$message]); + } catch (\Exception $e) { + // 记录错误日志 + Log::error('获取朋友圈资源链接异常:' . $e->getMessage()); + Log::error('异常堆栈:' . $e->getTraceAsString()); + + // 尝试重连 + try { + $this->reconnect(); + } catch (\Exception $reconnectError) { + Log::error('WebSocket重连失败:' . $reconnectError->getMessage()); + } + + return json_encode([ + 'code' => 500, + 'msg' => '获取朋友圈资源链接失败:' . $e->getMessage() + ]); } } @@ -490,12 +516,12 @@ class WebSocketController extends BaseController 'location' => $momentEntity['location'] ?? '', 'picSize' => $momentEntity['picSize'] ?? 0, 'resUrls' => json_encode($momentEntity['resUrls'] ?? [], 256), + 'urls' => json_encode($momentEntity['urls'] ?? [], 256), 'userName' => $momentEntity['userName'] ?? '', 'snsId' => $moment['snsId'] ?? '', 'type' => $moment['type'] ?? 0, 'update_time' => time() ]; - if (!empty($momentId)) { // 如果已存在,则更新数据 Db::table('s2_wechat_moments')->where('id', $momentId)->update($dataToSave); @@ -509,8 +535,17 @@ class WebSocketController extends BaseController $dataToSave['create_time'] = time(); $res = WechatMoments::create($dataToSave); } + // // 获取资源链接 + // if(empty($momentEntity['resUrls']) && !empty($momentEntity['urls'])){ + // $snsData = [ + // 'snsId' => $moment['snsId'], + // 'snsUrls' => $momentEntity['urls'], + // 'wechatAccountId' => $wechatAccountId, + // ]; + // $this->getMomentSourceRealUrl($snsData); + // } + } - //Log::write('朋友圈数据已存入数据库,共' . count($momentList) . '条'); return true; } catch (\Exception $e) { diff --git a/Server/application/command/WechatMomentsCommand.php b/Server/application/command/WechatMomentsCommand.php index cfcbe88c..b10a64a5 100644 --- a/Server/application/command/WechatMomentsCommand.php +++ b/Server/application/command/WechatMomentsCommand.php @@ -51,7 +51,7 @@ class WechatMomentsCommand extends Command $preMomentIdCacheKey = "preMomentId:{$jobId}"; // 从缓存获取初始页码和上次处理的朋友圈ID - $pageIndex = Cache::get($pageIndexCacheKey, 0); + $pageIndex = Cache::get($pageIndexCacheKey, 1); $preMomentId = Cache::get($preMomentIdCacheKey, ''); $output->writeln("从缓存获取页码: {$pageIndex}, 上次处理的朋友圈ID: {$preMomentId}"); diff --git a/Server/application/job/WechatMomentsJob.php b/Server/application/job/WechatMomentsJob.php index dd5bda2c..9042783b 100644 --- a/Server/application/job/WechatMomentsJob.php +++ b/Server/application/job/WechatMomentsJob.php @@ -8,169 +8,121 @@ use think\facade\Cache; use think\Db; use app\command\WechatMomentsCommand; use app\api\controller\WebSocketController; +use think\facade\Env; +use app\api\controller\AutomaticAssign; class WechatMomentsJob { protected $maxPages = 10; // 最大页数 - protected $pageSize = 10; // 每页大小 public function fire(Job $job, $data) { + $toAccountId = ''; + $username = Env::get('api.username', ''); + $password = Env::get('api.password', ''); + if (!empty($username) || !empty($password)) { + $toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId'); + }else{ + Log::error("没有账号配置"); + Cache::rm($queueLockKey); + return; + } + try { $jobId = $data['jobId'] ?? ''; $queueLockKey = $data['queueLockKey'] ?? ''; - Log::info("开始处理朋友圈采集任务,任务ID:{$jobId}"); - // 获取需要采集的账号列表 - $accounts = $this->getAccounts(); - if (empty($accounts)) { - Log::info("没有需要采集的账号"); + // 获取好友列表 + $friends = $this->getFriends($data['pageIndex'], $data['pageSize']); + if (empty($friends)) { + Log::info("没有更多好友数据,任务完成"); Cache::rm($queueLockKey); + $job->delete(); return; } - foreach ($accounts as $account) { - try { - Log::info("开始采集账号 {$account['userName']} 的朋友圈"); - - // 初始化WebSocket连接 - $wsController = new WebSocketController([ - 'userName' => $account['userName'], - 'password' => $account['password'], - 'accountId' => $account['id'] - ]); - - // 获取好友列表 - $friends = $this->getFriends($account['id'],$account['wechatAccountId']); - if (empty($friends)) { - Log::info("账号 {$account['userName']} 没有好友数据"); - continue; - } - - // 遍历好友采集朋友圈 - foreach ($friends as $friend) { - try { - $this->collectMoments($wsController, $account['wechatAccountId'], $friend['id']); - } catch (\Exception $e) { - Log::error("采集好友 {$friend['id']} 的朋友圈失败:" . $e->getMessage()); - continue; - } - } + foreach ($friends as $friend) { + try { + // 执行切换好友命令 + $automaticAssign = new AutomaticAssign(); + $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $toAccountId], true); + + // 执行采集朋友圈命令 + $webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]); + $webSocket->getMoments(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId']]); + // 处理完毕切换回原账号 + $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); } catch (\Exception $e) { - Log::error("处理账号 {$account['wechatAccountId']} 失败:" . $e->getMessage()); + // 发生异常时也要切换回原账号 + $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); + Log::error("采集好友 {$friend['id']} 的朋友圈失败:" . $e->getMessage()); continue; } } - - // 任务完成,释放队列锁 - Cache::rm($queueLockKey); - Log::info("朋友圈采集任务完成"); - + + // 判断是否需要继续翻页 + if (count($friends) < $data['pageSize']) { + // 如果返回的数据少于页面大小,说明已经没有更多数据了 + Log::info("朋友圈采集任务完成,没有更多数据"); + Cache::rm($queueLockKey); + $job->delete(); + } else { + // 还有更多数据,继续处理下一页 + $data['pageIndex']++; + if ($data['pageIndex'] > $this->maxPages) { + Log::info("已达到最大页数限制 {$this->maxPages},任务完成"); + Cache::rm($data['pageIndexCacheKey']); + Cache::rm($queueLockKey); + $job->delete(); + } else { + // 处理下一页 + Cache::set($data['pageIndexCacheKey'], $data['pageIndex']); + + // 有下一页,将下一页任务添加到队列 + $command = new WechatMomentsCommand(); + $command->addToQueue($data['pageIndex'], $data['pageSize'], $jobId, $queueLockKey); + } + } } catch (\Exception $e) { + $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); Log::error("朋友圈采集任务异常:" . $e->getMessage()); Cache::rm($queueLockKey); + $job->delete(); } - - $job->delete(); } - /** - * 获取需要采集的账号列表 - * @return array - */ - private function getAccounts() - { - $accounts = Db::table('s2_company_account') - ->alias('ca') - ->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId') - ->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId') - ->where('ca.passwordLocal', '<>', '') - ->where(['ca.status' => 0,'wf.isDeleted' => 0,'wa.deviceAlive' => 1,'wa.wechatAlive' => 1]) - ->field([ - 'ca.id', - 'ca.userName', - 'ca.passwordLocal', - 'wf.wechatAccountId' - ]) - ->group('wf.wechatAccountId DESC') - ->order('ca.id DESC') - ->select(); - - foreach ($accounts as &$value) { - $value['password'] = localDecrypt($value['passwordLocal']); - unset($value['passwordLocal']); - } - unset($value); - - return $accounts; - - - } /** * 获取账号的好友列表 * @param int $accountId 账号ID * @return array */ - private function getFriends($accountId,$wechatAccountId) + private function getFriends($page = 1 ,$pageSize = 100) { - return Db::table('s2_wechat_friend') - ->where('wechatAccountId', $wechatAccountId) - ->where('accountId', $accountId) - ->where('isDeleted', 0) - ->field(['id', 'wechatId','wechatAccountId','alias']) - ->order('id DESC') + $list = Db::table('s2_company_account') + ->alias('ca') + ->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId') + ->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id') + ->where([ + 'ca.status' => 0, + 'wf.isDeleted' => 0, + 'wa.deviceAlive' => 1, + 'wa.wechatAlive' => 1 + ]) + ->field([ + 'ca.id as accountId', + 'ca.userName', + 'wf.id as friendId', + 'wf.wechatId', + 'wf.wechatAccountId', + 'wa.wechatId as wechatAccountWechatId', + 'wa.currentDeviceId as deviceId' + ])->group('wf.wechatId') + ->order('wf.id DESC') + ->page($page, $pageSize) ->select(); - } - - /** - * 采集指定好友的朋友圈 - * @param WebSocketController $wsController WebSocket控制器 - * @param int $accountId 账号ID - * @param string $friendId 好友ID - */ - private function collectMoments($wsController, $accountId, $friendId) - { - $prevSnsId = 0; - $currentPage = 1; - - do { - $data = [ - 'wechatAccountId' => $accountId, - 'wechatFriendId' => $friendId, - 'count' => $this->pageSize, - 'prevSnsId' => $prevSnsId - ]; - - $result = $wsController->getMoments($data); - $result = json_decode($result, true); - - if ($result['code'] != 200 || empty($result['data']['list'])) { - break; - } - - // 更新最后一条数据的snsId - $lastMoment = end($result['data']['list']); - if (isset($lastMoment['snsId'])) { - $prevSnsId = $lastMoment['snsId']; - } - - $currentPage++; - - // 如果已经达到最大页数,退出循环 - if ($currentPage > $this->maxPages) { - break; - } - - // 如果返回的数据少于请求的数量,说明没有更多数据了 - if (count($result['data']['list']) < $this->pageSize) { - break; - } - - } while (true); - - Log::info("完成采集好友 {$friendId} 的朋友圈,共 {$currentPage} 页"); + return $list; } } \ No newline at end of file diff --git a/Server/application/job/WorkbenchJob.php b/Server/application/job/WorkbenchJob.php index 17c94728..6a8a997d 100644 --- a/Server/application/job/WorkbenchJob.php +++ b/Server/application/job/WorkbenchJob.php @@ -231,17 +231,10 @@ class WorkbenchJob if (!$this->validateAutoLikeConfig($workbench, $config)) { return; } - // 验证是否达到点赞次数上限 - $likeCount = $this->getTodayLikeCount($workbench, $config); - if ($likeCount >= $config['maxLikes']) { - Log::info("工作台 {$workbench->id} 点赞次数已达上限"); - return; - } // 验证是否在点赞时间范围内 if (!$this->isWithinLikeTimeRange($config)) { return; - } - + } // 处理分页获取好友列表 $this->processAllFriends($workbench, $config); } @@ -259,10 +252,14 @@ class WorkbenchJob if (empty($friendList)) { return; } - - - + foreach ($friendList as $friend) { + // 验证是否达到点赞次数上限 + $likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']); + if ($likeCount >= $config['maxLikes']) { + Log::info("工作台 {$workbench->id} 点赞次数已达上限"); + return; + } // 验证是否达到好友点赞次数上限 $friendMaxLikes = Db::name('workbench_auto_like_item') ->where('workbenchId', $workbench->id) @@ -303,10 +300,11 @@ class WorkbenchJob * @param WorkbenchAutoLike $config * @return int */ - protected function getTodayLikeCount($workbench, $config) + protected function getTodayLikeCount($workbench, $config, $deviceId) { return Db::name('workbench_auto_like_item') ->where('workbenchId', $workbench->id) + ->where('deviceId', $deviceId) ->whereTime('createTime', 'between', [ strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'), strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00') @@ -370,11 +368,9 @@ class WorkbenchJob // 修改好友标签 $labels = $this->getFriendLabels($friend); $labels[] = $config['friendTags']; - $webSocket->modifyFriendLabel(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $toAccountId, 'labels' => $labels]); + $webSocket->modifyFriendLabel(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId'], 'labels' => $labels]); } - - // 每个好友只点赞一条朋友圈,然后退出 break; } @@ -474,6 +470,7 @@ class WorkbenchJob { Db::name('workbench_auto_like_item')->insert([ 'workbenchId' => $workbench->id, + 'deviceId' => $friend['deviceId'], 'momentsId' => $moment['id'], 'snsId' => $moment['snsId'], 'wechatAccountId' => $friend['wechatAccountId'], @@ -561,7 +558,8 @@ class WorkbenchJob 'wf.id as friendId', 'wf.wechatId', 'wf.wechatAccountId', - 'wa.wechatId as wechatAccountWechatId' + 'wa.wechatId as wechatAccountWechatId', + 'wa.currentDeviceId as deviceId' ]); if (!empty($friends) && is_array($friends) && count($friends) > 0) {