From 16a723bcaf07086b5cb83fad56644ba3600eff85 Mon Sep 17 00:00:00 2001 From: wong <106998207@qq.com> Date: Wed, 23 Apr 2025 18:13:01 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E8=A7=A6=E5=AE=A2=E5=AE=9D=E3=80=91?= =?UTF-8?q?=20=E9=98=9F=E5=88=97=E6=B7=BB=E5=8A=A0=E9=99=90=E5=88=B6?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E4=BC=98=E5=8C=96=E6=95=B4=E4=BD=93=E6=B5=81?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/controller/DeviceController.php | 12 +- .../controller/WechatChatroomController.php | 12 +- .../api/controller/WechatFriendController.php | 12 +- .../application/command/DeviceListCommand.php | 59 ++++- .../command/WechatChatroomCommand.php | 54 ++++- .../command/WechatFriendCommand.php | 64 +++++- .../common/service/AuthService.php | 2 +- .../controller/ContentLibraryController.php | 7 +- Server/application/job/DeviceListJob.php | 180 +++++++++------ Server/application/job/WechatChatroomJob.php | 179 +++++++-------- Server/application/job/WechatFriendJob.php | 208 ++++++++++-------- 11 files changed, 491 insertions(+), 298 deletions(-) diff --git a/Server/application/api/controller/DeviceController.php b/Server/application/api/controller/DeviceController.php index 413b804a..8da841be 100644 --- a/Server/application/api/controller/DeviceController.php +++ b/Server/application/api/controller/DeviceController.php @@ -20,7 +20,7 @@ class DeviceController extends BaseController * @param bool $isJob 是否为定时任务调用 * @return \think\response\Json */ - public function getlist($pageIndex = '',$pageSize = '',$isJob = false) + public function getlist($pageIndex = '',$pageSize = '',$isJob = false,$isDel = 0) { // 获取授权token $authorization = trim($this->request->header('authorization', $this->authorization)); @@ -33,6 +33,14 @@ class DeviceController extends BaseController } try { + // 根据isDel设置对应的deleteType值 + $deleteType = 'unDeleted'; // 默认值 + if ($isDel == 1) { + $deleteType = 'deleted'; + } elseif ($isDel == 2) { + $deleteType = 'deletedAndStop'; + } + // 构建请求参数 $params = [ 'accountId' => $this->request->param('accountId', ''), @@ -41,7 +49,7 @@ class DeviceController extends BaseController 'groupId' => $this->request->param('groupId', ''), 'brand' => $this->request->param('brand', ''), 'model' => $this->request->param('model', ''), - 'deleteType' => $this->request->param('deleteType', 'unDeleted'), + 'deleteType' => $this->request->param('deleteType', $deleteType), 'operatingSystem' => $this->request->param('operatingSystem', ''), 'softwareVersion' => $this->request->param('softwareVersion', ''), 'phoneAppVersion' => $this->request->param('phoneAppVersion', ''), diff --git a/Server/application/api/controller/WechatChatroomController.php b/Server/application/api/controller/WechatChatroomController.php index 5051bafa..b3484d44 100644 --- a/Server/application/api/controller/WechatChatroomController.php +++ b/Server/application/api/controller/WechatChatroomController.php @@ -13,7 +13,7 @@ class WechatChatroomController extends BaseController * 获取微信群聊列表 * @return \think\response\Json */ - public function getlist($pageIndex = '',$pageSize = '',$isJob = false) + public function getlist($pageIndex = '',$pageSize = '',$isJob = false, $isDel = '') { // 获取授权token $authorization = trim($this->request->header('authorization', $this->authorization)); @@ -26,11 +26,19 @@ class WechatChatroomController extends BaseController } try { + // 根据isDel设置对应的isDeleted值 + $isDeleted = ''; + if ($isDel === '0' || $isDel === 0) { + $isDeleted = false; + } elseif ($isDel === '1' || $isDel === 1) { + $isDeleted = true; + } + // 构建请求参数 $params = [ 'keyword' => $this->request->param('keyword', ''), 'wechatAccountKeyword' => $this->request->param('wechatAccountKeyword', ''), - 'isDeleted' => $this->request->param('isDeleted', ''), + 'isDeleted' => $this->request->param('isDeleted', $isDeleted), 'allotAccountId' => $this->request->param('allotAccountId', ''), 'groupId' => $this->request->param('groupId', ''), 'wechatChatroomId' => $this->request->param('wechatChatroomId', 0), diff --git a/Server/application/api/controller/WechatFriendController.php b/Server/application/api/controller/WechatFriendController.php index 33557ace..298fc69b 100644 --- a/Server/application/api/controller/WechatFriendController.php +++ b/Server/application/api/controller/WechatFriendController.php @@ -16,7 +16,7 @@ class WechatFriendController extends BaseController * @param bool $isJob 是否为任务调用 * @return \think\response\Json */ - public function getlist($pageIndex = '', $pageSize = '', $preFriendId = '', $isJob = false) + public function getlist($pageIndex = '', $pageSize = '', $preFriendId = '', $isJob = false,$isDel = '') { // 获取授权token $authorization = trim($this->request->header('authorization', $this->authorization)); @@ -29,6 +29,14 @@ class WechatFriendController extends BaseController } try { + // 根据isDel设置对应的isDeleted值 + $isDeleted = null; // 默认值 + if ($isDel === '0' || $isDel === 0) { + $isDeleted = false; + } elseif ($isDel === '1' || $isDel === 1) { + $isDeleted = true; + } + // 构建请求参数 $params = [ 'accountKeyword' => '', @@ -39,7 +47,7 @@ class WechatFriendController extends BaseController 'extendFields' => '{}', 'gender' => '', 'groupId' => null, - 'isDeleted' => null, + 'isDeleted' => $isDeleted, 'isPass' => null, 'keyword' => input('keyword', ''), 'labels' => '[]', diff --git a/Server/application/command/DeviceListCommand.php b/Server/application/command/DeviceListCommand.php index 4def0764..4fda4931 100644 --- a/Server/application/command/DeviceListCommand.php +++ b/Server/application/command/DeviceListCommand.php @@ -5,16 +5,23 @@ namespace app\command; use think\console\Command; use think\console\Input; use think\console\Output; +use think\console\input\Option; use think\facade\Log; use think\Queue; use app\job\DeviceListJob; +use think\facade\Cache; class DeviceListCommand extends Command { + // 队列名称 + protected $queueName = 'device_list'; + protected function configure() { $this->setName('device:list') - ->setDescription('获取设备列表,并根据分页自动处理下一页'); + ->setDescription('获取设备列表,并根据分页自动处理下一页') + ->addOption('isDel', null, Option::VALUE_OPTIONAL, '删除状态: 0=未删除(unDeleted), 1=已删除(deleted), 2=已停用(deletedAndStop)', '') + ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); } protected function execute(Input $input, Output $output) @@ -22,12 +29,38 @@ class DeviceListCommand extends Command $output->writeln('开始处理设备列表任务...'); try { - // 初始页码 - $pageIndex = 0; + // 获取是否删除参数和任务ID + $isDel = $input->getOption('isDel'); + $jobId = $input->getOption('jobId'); + + $output->writeln('删除状态参数: ' . ($isDel === '' ? '全部' : ($isDel == 0 ? '未删除' : ($isDel == 1 ? '已删除' : '已停用')))); + $output->writeln('任务ID: ' . $jobId); + + // 检查队列是否已经在运行 + $queueLockKey = "queue_lock:{$this->queueName}:{$isDel}"; + if (Cache::get($queueLockKey)) { + $output->writeln("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + Log::warning("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + return false; + } + + // 设置队列运行锁,有效期1小时 + Cache::set($queueLockKey, $jobId, 3600); + $output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时"); + + // 为不同的删除状态和任务ID使用不同的缓存键名 + $cacheKeyPrefix = "devicePage:{$jobId}"; + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $cacheKey = $cacheKeyPrefix . $cacheKeySuffix; + + // 从缓存获取初始页码,缓存有效期1天 + $pageIndex = Cache::get($cacheKey, 0); + $output->writeln("从缓存获取页码: {$pageIndex}, 缓存键: {$cacheKey}"); + $pageSize = 100; // 每页获取100条记录 - // 将第一页任务添加到队列 - $this->addToQueue($pageIndex, $pageSize); + // 将任务添加到队列 + $this->addToQueue($pageIndex, $pageSize, $isDel, $jobId, $cacheKey, $queueLockKey); $output->writeln('设备列表任务已添加到队列'); } catch (\Exception $e) { @@ -43,15 +76,23 @@ class DeviceListCommand extends Command * 添加任务到队列 * @param int $pageIndex 页码 * @param int $pageSize 每页大小 + * @param string $isDel 删除状态 + * @param string $jobId 任务ID + * @param string $cacheKey 缓存键名 + * @param string $queueLockKey 队列锁键名 */ - protected function addToQueue($pageIndex, $pageSize) + protected function addToQueue($pageIndex, $pageSize, $isDel = '', $jobId = '', $cacheKey = '', $queueLockKey = '') { $data = [ 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize + 'pageSize' => $pageSize, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'cacheKey' => $cacheKey, + 'queueLockKey' => $queueLockKey ]; // 添加到队列,设置任务名为 device_list - Queue::push(DeviceListJob::class, $data, 'device_list'); + Queue::push(DeviceListJob::class, $data, $this->queueName); } -} \ No newline at end of file +} \ No newline at end of file diff --git a/Server/application/command/WechatChatroomCommand.php b/Server/application/command/WechatChatroomCommand.php index b044f103..ba7a6547 100644 --- a/Server/application/command/WechatChatroomCommand.php +++ b/Server/application/command/WechatChatroomCommand.php @@ -5,6 +5,7 @@ namespace app\command; use think\console\Command; use think\console\Input; use think\console\Output; +use think\console\input\Option; use think\facade\Log; use think\Queue; use app\job\WechatChatroomJob; @@ -12,10 +13,15 @@ use think\facade\Cache; class WechatChatroomCommand extends Command { + // 队列名称 + protected $queueName = 'wechat_chatroom'; + protected function configure() { $this->setName('wechatChatroom:list') - ->setDescription('获取微信聊天室列表,并根据分页自动处理下一页'); + ->setDescription('获取微信聊天室列表,并根据分页自动处理下一页') + ->addOption('isDel', null, Option::VALUE_OPTIONAL, '删除状态: 0=未删除(false), 1=已删除(true)', '') + ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); } protected function execute(Input $input, Output $output) @@ -23,14 +29,38 @@ class WechatChatroomCommand extends Command $output->writeln('开始处理微信聊天室列表任务...'); try { - // 从缓存获取初始页码,缓存10分钟有效 - $pageIndex = Cache::get('chatroomPage', 0); - $output->writeln('从缓存获取页码:' . $pageIndex); + // 获取是否删除参数和任务ID + $isDel = $input->getOption('isDel'); + $jobId = $input->getOption('jobId'); + + $output->writeln('删除状态参数: ' . ($isDel === '' ? '全部' : ($isDel == 0 ? '未删除' : '已删除'))); + $output->writeln('任务ID: ' . $jobId); + + // 检查队列是否已经在运行 + $queueLockKey = "queue_lock:{$this->queueName}:{$isDel}"; + if (Cache::get($queueLockKey)) { + $output->writeln("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + Log::warning("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + return false; + } + + // 设置队列运行锁,有效期1小时 + Cache::set($queueLockKey, $jobId, 3600); + $output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时"); + + // 为不同的删除状态和任务ID使用不同的缓存键名 + $cacheKeyPrefix = "chatroomPage:{$jobId}"; + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $cacheKey = $cacheKeyPrefix . $cacheKeySuffix; + + // 从缓存获取初始页码,缓存有效期1天 + $pageIndex = Cache::get($cacheKey, 0); + $output->writeln("从缓存获取页码: {$pageIndex}, 缓存键: {$cacheKey}"); $pageSize = 100; // 每页获取100条记录 // 将任务添加到队列 - $this->addToQueue($pageIndex, $pageSize); + $this->addToQueue($pageIndex, $pageSize, $isDel, $jobId, $cacheKey, $queueLockKey); $output->writeln('微信聊天室列表任务已添加到队列'); } catch (\Exception $e) { @@ -46,15 +76,23 @@ class WechatChatroomCommand extends Command * 添加任务到队列 * @param int $pageIndex 页码 * @param int $pageSize 每页大小 + * @param string $isDel 删除状态 + * @param string $jobId 任务ID + * @param string $cacheKey 缓存键名 + * @param string $queueLockKey 队列锁键名 */ - protected function addToQueue($pageIndex, $pageSize) + protected function addToQueue($pageIndex, $pageSize, $isDel = '', $jobId = '', $cacheKey = '', $queueLockKey = '') { $data = [ 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize + 'pageSize' => $pageSize, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'cacheKey' => $cacheKey, + 'queueLockKey' => $queueLockKey ]; // 添加到队列,设置任务名为 wechat_chatroom - Queue::push(WechatChatroomJob::class, $data, 'wechat_chatroom'); + Queue::push(WechatChatroomJob::class, $data, $this->queueName); } } \ No newline at end of file diff --git a/Server/application/command/WechatFriendCommand.php b/Server/application/command/WechatFriendCommand.php index 48862bf3..3978f218 100644 --- a/Server/application/command/WechatFriendCommand.php +++ b/Server/application/command/WechatFriendCommand.php @@ -5,6 +5,7 @@ namespace app\command; use think\console\Command; use think\console\Input; use think\console\Output; +use think\console\input\Option; use think\facade\Log; use think\Queue; use app\job\WechatFriendJob; @@ -12,10 +13,15 @@ use think\facade\Cache; class WechatFriendCommand extends Command { + // 队列名称 + protected $queueName = 'wechat_friends'; + protected function configure() { $this->setName('wechatFriends:list') - ->setDescription('获微信列表,并根据分页自动处理下一页'); + ->setDescription('获微信列表,并根据分页自动处理下一页') + ->addOption('isDel', null, Option::VALUE_OPTIONAL, '删除状态: 0=未删除(false), 1=已删除(true)', '') + ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); } protected function execute(Input $input, Output $output) @@ -23,16 +29,42 @@ class WechatFriendCommand extends Command $output->writeln('开始处理微信列表任务...'); try { - // 从缓存获取初始页码和上次处理的好友ID,缓存10分钟有效 - $pageIndex = Cache::get('friendsPage', 0); - $preFriendId = Cache::get('preFriendId', ''); + // 获取是否删除参数和任务ID + $isDel = $input->getOption('isDel'); + $jobId = $input->getOption('jobId'); - $output->writeln('从缓存获取页码:' . $pageIndex . ',上次处理的好友ID:' . ($preFriendId ?: '无')); + $output->writeln('删除状态参数: ' . ($isDel === '' ? '全部' : ($isDel == 0 ? '未删除' : '已删除'))); + $output->writeln('任务ID: ' . $jobId); - $pageSize = 100; // 每页获取1000条记录 + // 检查队列是否已经在运行 + $queueLockKey = "queue_lock:{$this->queueName}:{$isDel}"; + if (Cache::get($queueLockKey)) { + $output->writeln("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + Log::warning("队列 {$this->queueName} 已经在运行中,删除状态:{$isDel},跳过执行"); + return false; + } + + // 设置队列运行锁,有效期1小时 + Cache::set($queueLockKey, $jobId, 3600); + $output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时"); + + // 为不同的删除状态和任务ID使用不同的缓存键名 + $cacheKeyPrefix = "friendsPage:{$jobId}"; + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $pageIndexCacheKey = $cacheKeyPrefix . $cacheKeySuffix; + $preFriendIdCacheKey = "preFriendId:{$jobId}" . $cacheKeySuffix; + + // 从缓存获取初始页码和上次处理的好友ID + $pageIndex = Cache::get($pageIndexCacheKey, 0); + $preFriendId = Cache::get($preFriendIdCacheKey, ''); + + $output->writeln("从缓存获取页码: {$pageIndex}, 上次处理的好友ID: {$preFriendId}"); + $output->writeln("缓存键: {$pageIndexCacheKey}, {$preFriendIdCacheKey}"); + + $pageSize = 100; // 每页获取100条记录 // 将任务添加到队列 - $this->addToQueue($pageIndex, $pageSize, $preFriendId); + $this->addToQueue($pageIndex, $pageSize, $preFriendId, $isDel, $jobId, $pageIndexCacheKey, $preFriendIdCacheKey, $queueLockKey); $output->writeln('微信列表任务已添加到队列'); } catch (\Exception $e) { @@ -49,16 +81,26 @@ class WechatFriendCommand extends Command * @param int $pageIndex 页码 * @param int $pageSize 每页大小 * @param string $preFriendId 上一个好友ID + * @param string $isDel 删除状态 + * @param string $jobId 任务ID + * @param string $pageIndexCacheKey 页码缓存键名 + * @param string $preFriendIdCacheKey 好友ID缓存键名 + * @param string $queueLockKey 队列锁键名 */ - protected function addToQueue($pageIndex, $pageSize, $preFriendId = '') + public function addToQueue($pageIndex, $pageSize, $preFriendId = '', $isDel = '', $jobId = '', $pageIndexCacheKey = '', $preFriendIdCacheKey = '', $queueLockKey = '') { $data = [ 'pageIndex' => $pageIndex, 'pageSize' => $pageSize, - 'preFriendId' => $preFriendId + 'preFriendId' => $preFriendId, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'pageIndexCacheKey' => $pageIndexCacheKey, + 'preFriendIdCacheKey' => $preFriendIdCacheKey, + 'queueLockKey' => $queueLockKey ]; // 添加到队列,设置任务名为 wechat_friends - Queue::push(WechatFriendJob::class, $data, 'wechat_friends'); + Queue::push(WechatFriendJob::class, $data, $this->queueName); } -} \ No newline at end of file +} \ No newline at end of file diff --git a/Server/application/common/service/AuthService.php b/Server/application/common/service/AuthService.php index 6a106103..8afc53a2 100644 --- a/Server/application/common/service/AuthService.php +++ b/Server/application/common/service/AuthService.php @@ -168,7 +168,7 @@ class AuthService // 尝试从缓存获取授权信息 $authorization = Cache::get($cacheKey); - $authorization = 'mYpVVhPY7PxctvYw1pn1VCTS2ck0yZG8q11gAiJrRN_D3q7KXXBPAfXoAmqs7kKHeaAx-h4GB7DiqVIQJ09HiXVhaQT6PtgLX3w8YV16erThC-lG1fyJB4DJxu-QxA3Q8ogSs1WFOa8aAXD1QQUZ7Kbjkw_VMLL4lrfe0Yjaqy3DnO7aL1xGnNjjX8P5uqCAZgHKlN8NjuDEGyYvXygW1YyoK9pNpwvq-6DYKjLWdmbHvFaAybHf-hU1XyrFavZqcZYxIoVXjfJ5ASp4XxeCWqMCzwtSoz9RAvwLAlNxGweowtuyX9389ZaXI-zbqb2T0S8llg'; + //$authorization = 'mYpVVhPY7PxctvYw1pn1VCTS2ck0yZG8q11gAiJrRN_D3q7KXXBPAfXoAmqs7kKHeaAx-h4GB7DiqVIQJ09HiXVhaQT6PtgLX3w8YV16erThC-lG1fyJB4DJxu-QxA3Q8ogSs1WFOa8aAXD1QQUZ7Kbjkw_VMLL4lrfe0Yjaqy3DnO7aL1xGnNjjX8P5uqCAZgHKlN8NjuDEGyYvXygW1YyoK9pNpwvq-6DYKjLWdmbHvFaAybHf-hU1XyrFavZqcZYxIoVXjfJ5ASp4XxeCWqMCzwtSoz9RAvwLAlNxGweowtuyX9389ZaXI-zbqb2T0S8llg'; // 如果缓存中没有或已过期,则重新获取 if (empty($authorization)) { try { diff --git a/Server/application/cunkebao/controller/ContentLibraryController.php b/Server/application/cunkebao/controller/ContentLibraryController.php index 51619ed6..7e0db938 100644 --- a/Server/application/cunkebao/controller/ContentLibraryController.php +++ b/Server/application/cunkebao/controller/ContentLibraryController.php @@ -127,7 +127,7 @@ class ContentLibraryController extends Controller ['userId', '=', $this->request->userInfo['id']], ['isDel', '=', 0] // 只查询未删除的记录 ]) - ->field('id,name,sourceType,sourceFriends,sourceGroups,keywordInclude,keywordExclude,aiEnabled,aiPrompt,timeEnabled,timeStart,timeEnd,status,userId,companyId,createTime,updateTime') + ->field('id,name,sourceType,sourceFriends,sourceGroups,keywordInclude,keywordExclude,aiEnabled,aiPrompt,timeEnabled,timeStart,timeEnd,status,userId,companyId,createTime,updateTime,groupMembers') ->find(); if (empty($library)) { @@ -139,6 +139,7 @@ class ContentLibraryController extends Controller $library['sourceGroups'] = json_decode($library['sourceGroups'] ?: '[]', true); $library['keywordInclude'] = json_decode($library['keywordInclude'] ?: '[]', true); $library['keywordExclude'] = json_decode($library['keywordExclude'] ?: '[]', true); + $library['groupMembers'] = json_decode($library['groupMembers'] ?: '[]', true); // 将时间戳转换为日期格式(精确到日) if (!empty($library['timeStart'])) { @@ -212,7 +213,7 @@ class ContentLibraryController extends Controller } // 检查内容库名称是否已存在 - $exists = ContentLibrary::where('name', $param['name'])->find(); + $exists = ContentLibrary::where(['name' => $param['name'],'userId' => $this->request->userInfo['id'],'isDel' => 0])->find(); if ($exists) { return json(['code' => 400, 'msg' => '内容库名称已存在']); } @@ -231,6 +232,7 @@ class ContentLibraryController extends Controller // 数据来源配置 'sourceFriends' => $sourceType == 1 ? json_encode($param['friends']) : json_encode([]), // 选择的微信好友 'sourceGroups' => $sourceType == 2 ? json_encode($param['groups']) : json_encode([]), // 选择的微信群 + 'groupMembers' => $sourceType == 2 ? json_encode($param['groupMembers']) : json_encode([]), // 群组成员 // 关键词配置 'keywordInclude' => $keywordInclude, // 包含的关键词 'keywordExclude' => $keywordExclude, // 排除的关键词 @@ -312,6 +314,7 @@ class ContentLibraryController extends Controller $library->sourceType = isset($param['sourceType']) ? $param['sourceType'] : 1; $library->sourceFriends = $param['sourceType'] == 1 ? json_encode($param['friends']) : json_encode([]); $library->sourceGroups = $param['sourceType'] == 2 ? json_encode($param['groups']) : json_encode([]); + $library->groupMembers = $param['sourceType'] == 2 ? json_encode($param['groupMembers']) : json_encode([]); $library->keywordInclude = $keywordInclude; $library->keywordExclude = $keywordExclude; $library->aiEnabled = isset($param['aiEnabled']) ? $param['aiEnabled'] : 0; diff --git a/Server/application/job/DeviceListJob.php b/Server/application/job/DeviceListJob.php index d9c3a41b..b887af07 100644 --- a/Server/application/job/DeviceListJob.php +++ b/Server/application/job/DeviceListJob.php @@ -2,10 +2,12 @@ namespace app\job; +use app\command\DeviceListCommand; use think\queue\Job; use think\facade\Log; use think\Queue; use think\facade\Config; +use think\facade\Cache; use app\api\controller\DeviceController; class DeviceListJob @@ -19,99 +21,135 @@ class DeviceListJob public function fire(Job $job, $data) { try { - // 如果任务执行成功后删除任务 - if ($this->processDeviceList($data, $job->attempts())) { + // 获取数据 + $pageIndex = $data['pageIndex']; + $pageSize = $data['pageSize']; + $isDel = $data['isDel']; + $jobId = isset($data['jobId']) ? $data['jobId'] : ''; + $cacheKey = isset($data['cacheKey']) ? $data['cacheKey'] : ''; + $queueLockKey = isset($data['queueLockKey']) ? $data['queueLockKey'] : ''; + + // 记录日志 + Log::info('开始处理设备列表任务: ' . json_encode([ + 'pageIndex' => $pageIndex, + 'pageSize' => $pageSize, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'cacheKey' => $cacheKey, + 'queueLockKey' => $queueLockKey + ])); + + // 如果没有提供缓存键,根据删除状态和任务ID生成一个 + if (empty($cacheKey)) { + $cacheKeyPrefix = "devicePage:" . ($jobId ?: date('YmdHis') . rand(1000, 9999)); + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $cacheKey = $cacheKeyPrefix . $cacheKeySuffix; + } + + // 如果没有提供队列锁键,生成一个 + if (empty($queueLockKey)) { + $queueLockKey = "queue_lock:device_list:{$isDel}"; + } + + // 实例化控制器 + $deviceController = new DeviceController(); + + // 设置请求信息 + $request = request(); + $request->withGet([ + 'pageIndex' => $pageIndex, + 'pageSize' => $pageSize + ]); + + // 调用设备列表获取方法,传入isDel参数 + $result = $deviceController->getlist($pageIndex, $pageSize, true, $isDel); + $response = json_decode($result, true); + + if ($response['code'] == 200) { + $data = $response['data']; + $dataCount = count($data['results']); + $totalCount = $data['total']; + + Log::info("设备列表获取成功,当前页:{$pageIndex},获取数量:{$dataCount},总数量:{$totalCount}"); + + // 计算是否还有下一页 + $hasNextPage = ($pageIndex + 1) * $pageSize < $totalCount; + + if ($hasNextPage) { + // 缓存页码信息,设置有效期1天 + $nextPageIndex = $pageIndex + 1; + Cache::set($cacheKey, $nextPageIndex, 86400); + Log::info("更新缓存页码: {$nextPageIndex}, 缓存键: {$cacheKey}"); + + // 添加下一页任务到队列 + $command = new DeviceListCommand(); + $command->addToQueue($nextPageIndex, $pageSize, $isDel, $jobId, $cacheKey, $queueLockKey); + Log::info("已添加下一页任务到队列: 页码 {$nextPageIndex}"); + } else { + // 处理完所有页面,重置页码并释放队列锁 + Cache::set($cacheKey, 0, 86400); + Cache::delete($queueLockKey); + Log::info("所有设备列表页面处理完毕,重置页码为0,释放队列锁: {$queueLockKey}"); + } + $job->delete(); - Log::info('设备列表任务执行成功,页码:' . $data['pageIndex']); + return true; } else { + // API调用出错,记录错误并释放队列锁 + $errorMsg = isset($response['msg']) ? $response['msg'] : '未知错误'; + Log::error("设备列表获取失败: " . $errorMsg); + if ($job->attempts() > 3) { - // 超过重试次数,删除任务 - Log::error('设备列表任务执行失败,已超过重试次数,页码:' . $data['pageIndex']); + // 超过重试次数,删除任务并释放队列锁 + Cache::delete($queueLockKey); + Log::info("由于错误释放队列锁: {$queueLockKey}"); $job->delete(); } else { // 任务失败,重新放回队列 - Log::warning('设备列表任务执行失败,重试次数:' . $job->attempts() . ',页码:' . $data['pageIndex']); + Log::warning('设备列表任务执行失败,重试次数:' . $job->attempts() . ',页码:' . $pageIndex); $job->release(Config::get('queue.failed_delay', 10)); } + + return false; } } catch (\Exception $e) { - // 出现异常,记录日志 - Log::error('设备列表任务异常:' . $e->getMessage()); + // 出现异常,记录错误并释放队列锁 + Log::error('设备列表任务处理异常: ' . $e->getMessage()); + + if (!empty($queueLockKey)) { + Cache::delete($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + if ($job->attempts() > 3) { $job->delete(); } else { $job->release(Config::get('queue.failed_delay', 10)); } - } - } - - /** - * 处理设备列表获取 - * @param array $data 任务数据 - * @param int $attempts 重试次数 - * @return bool - */ - protected function processDeviceList($data, $attempts) - { - // 获取参数 - $pageIndex = isset($data['pageIndex']) ? $data['pageIndex'] : 0; - $pageSize = isset($data['pageSize']) ? $data['pageSize'] : 100; - - Log::info('开始获取设备列表,页码:' . $pageIndex . ',页大小:' . $pageSize); - - // 实例化控制器 - $deviceController = new DeviceController(); - - // 构建请求参数 - $params = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize - ]; - - // 设置请求信息 - $request = request(); - $request->withGet($params); - - - - // 调用设备列表获取方法 - $result = $deviceController->getlist($pageIndex,$pageSize,true); - $response = json_decode($result,true); - - - // 判断是否成功 - if ($response['code'] == 200) { - $data = $response['data']; - // 判断是否有下一页 - if (!empty($data) && count($data['results']) > 0) { - // 有下一页,将下一页任务添加到队列 - $nextPageIndex = $pageIndex + 1; - $this->addNextPageToQueue($nextPageIndex, $pageSize); - Log::info('添加下一页任务到队列,页码:' . $nextPageIndex); - } - - return true; - } else { - $errorMsg = isset($response['msg']) ? $response['msg'] : '未知错误'; - Log::error('获取设备列表失败:' . $errorMsg); return false; } } /** - * 添加下一页任务到队列 - * @param int $pageIndex 页码 - * @param int $pageSize 每页大小 + * 获取删除状态的文本描述 + * @param string $isDel 删除状态 + * @return string 删除状态文本 */ - protected function addNextPageToQueue($pageIndex, $pageSize) + protected function getDeleteTypeText($isDel) { - $data = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize - ]; - - // 添加到队列,设置任务名为 device_list - Queue::push(self::class, $data, 'device_list'); + switch ($isDel) { + case '0': + case 0: + return '未删除(unDeleted)'; + case '1': + case 1: + return '已删除(deleted)'; + case '2': + case 2: + return '已停用(deletedAndStop)'; + default: + return '全部'; + } } } \ No newline at end of file diff --git a/Server/application/job/WechatChatroomJob.php b/Server/application/job/WechatChatroomJob.php index d534e186..50c78614 100644 --- a/Server/application/job/WechatChatroomJob.php +++ b/Server/application/job/WechatChatroomJob.php @@ -2,123 +2,100 @@ namespace app\job; -use think\queue\Job; +use app\command\WechatChatroomCommand; use think\facade\Log; -use think\Queue; -use think\facade\Config; use think\facade\Cache; -use app\api\controller\WechatChatroomController; +use think\Queue; +use app\common\BusinessLogic; class WechatChatroomJob { /** - * 队列任务处理 - * @param Job $job 队列任务 - * @param array $data 任务数据 - * @return void + * 队列执行方法 + * @param $data 数据 + * @return array|bool */ - public function fire(Job $job, $data) + public function fire($job, $data) { try { - // 如果任务执行成功后删除任务 - if ($this->processWechatChatroomList($data, $job->attempts())) { - $job->delete(); - Log::info('微信群列表任务执行成功,页码:' . $data['pageIndex']); - } else { - if ($job->attempts() > 3) { - // 超过重试次数,删除任务 - Log::error('微信群列表任务执行失败,已超过重试次数,页码:' . $data['pageIndex']); - $job->delete(); - } else { - // 任务失败,重新放回队列 - Log::warning('微信群列表任务执行失败,重试次数:' . $job->attempts() . ',页码:' . $data['pageIndex']); - $job->release(Config::get('queue.failed_delay', 10)); - } - } - } catch (\Exception $e) { - // 出现异常,记录日志 - Log::error('微信群列表任务异常:' . $e->getMessage()); - if ($job->attempts() > 3) { - $job->delete(); - } else { - $job->release(Config::get('queue.failed_delay', 10)); - } - } - } - - /** - * 处理微信群列表获取 - * @param array $data 任务数据 - * @param int $attempts 重试次数 - * @return bool - */ - protected function processWechatChatroomList($data, $attempts) - { - // 获取参数 - $pageIndex = isset($data['pageIndex']) ? $data['pageIndex'] : 0; - $pageSize = isset($data['pageSize']) ? $data['pageSize'] : 100; - - Log::info('开始获取微信群列表,页码:' . $pageIndex . ',页大小:' . $pageSize); - - // 实例化控制器 - $wechatChatroomController = new WechatChatroomController(); - - // 构建请求参数 - $params = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize - ]; - - // 设置请求信息 - $request = request(); - $request->withGet($params); - - // 调用设备列表获取方法 - $result = $wechatChatroomController->getlist($pageIndex,$pageSize,true); - $response = json_decode($result,true); - - - // 判断是否成功 - if ($response['code'] == 200) { - $data = $response['data']; + // 获取数据 + $pageIndex = $data['pageIndex']; + $pageSize = $data['pageSize']; + $isDel = $data['isDel']; + $jobId = isset($data['jobId']) ? $data['jobId'] : ''; + $cacheKey = isset($data['cacheKey']) ? $data['cacheKey'] : ''; + $queueLockKey = isset($data['queueLockKey']) ? $data['queueLockKey'] : ''; - // 判断是否有下一页 - if (!empty($data) && count($data['results']) > 0) { - // 更新缓存中的页码,设置10分钟过期 - Cache::set('chatroomPage', $pageIndex + 1, 86400); - Log::info('更新缓存,下一页页码:' . ($pageIndex + 1) . ',缓存时间:10分钟'); + // 记录日志 + Log::info('开始处理微信聊天室列表任务: ' . json_encode([ + 'pageIndex' => $pageIndex, + 'pageSize' => $pageSize, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'cacheKey' => $cacheKey, + 'queueLockKey' => $queueLockKey + ])); + + // 如果没有提供缓存键,根据删除状态和任务ID生成一个 + if (empty($cacheKey)) { + $cacheKeyPrefix = "chatroomPage:" . ($jobId ?: date('YmdHis') . rand(1000, 9999)); + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $cacheKey = $cacheKeyPrefix . $cacheKeySuffix; + } + + // 如果没有提供队列锁键,生成一个 + if (empty($queueLockKey)) { + $queueLockKey = "queue_lock:wechat_chatroom:{$isDel}"; + } + + // 调用业务逻辑获取微信聊天室列表 + $logic = new BusinessLogic(); + $result = $logic->wechatChatroomList($pageIndex, $pageSize, $isDel); + + if ($result['code'] == 1) { + $dataCount = count($result['data']['list']); + $totalCount = $result['data']['total']; - // 有下一页,将下一页任务添加到队列 - $nextPageIndex = $pageIndex + 1; - $this->addNextPageToQueue($nextPageIndex, $pageSize); - Log::info('添加下一页任务到队列,页码:' . $nextPageIndex); + Log::info("微信聊天室列表获取成功,当前页:{$pageIndex},获取数量:{$dataCount},总数量:{$totalCount}"); + + // 计算是否还有下一页 + $hasNextPage = ($pageIndex + 1) * $pageSize < $totalCount; + + if ($hasNextPage) { + // 缓存页码信息,设置有效期1天 + $nextPageIndex = $pageIndex + 1; + Cache::set($cacheKey, $nextPageIndex, 86400); + Log::info("更新缓存页码: {$nextPageIndex}, 缓存键: {$cacheKey}"); + + // 添加下一页任务到队列 + $command = new WechatChatroomCommand(); + $command->addToQueue($nextPageIndex, $pageSize, $isDel, $jobId, $cacheKey, $queueLockKey); + Log::info("已添加下一页任务到队列: 页码 {$nextPageIndex}"); + } else { + // 处理完所有页面,重置页码并释放队列锁 + Cache::set($cacheKey, 0, 86400); + Cache::delete($queueLockKey); + Log::info("所有微信聊天室列表页面处理完毕,重置页码为0,释放队列锁: {$queueLockKey}"); + } } else { - // 没有下一页,重置缓存,设置10分钟过期 - Cache::set('chatroomPage', 0, 86400); - Log::info('获取完成,重置缓存,缓存时间:10分钟'); + // API调用出错,记录错误并释放队列锁 + Log::error("微信聊天室列表获取失败: " . $result['msg']); + Cache::delete($queueLockKey); + Log::info("由于错误释放队列锁: {$queueLockKey}"); } + $job->delete(); return true; - } else { - $errorMsg = isset($response['msg']) ? $response['msg'] : '未知错误'; - Log::error('获取微信群列表失败:' . $errorMsg); + } catch (\Exception $e) { + // 出现异常,记录错误并释放队列锁 + Log::error('微信聊天室列表任务处理异常: ' . $e->getMessage()); + if (!empty($queueLockKey)) { + Cache::delete($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + + $job->delete(); return false; } } - - /** - * 添加下一页任务到队列 - * @param int $pageIndex 页码 - * @param int $pageSize 每页大小 - */ - protected function addNextPageToQueue($pageIndex, $pageSize) - { - $data = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize - ]; - - // 添加到队列,设置任务名为 wechat_chatroom - Queue::push(self::class, $data, 'wechat_chatroom'); - } } \ No newline at end of file diff --git a/Server/application/job/WechatFriendJob.php b/Server/application/job/WechatFriendJob.php index 0867d4a1..26189162 100644 --- a/Server/application/job/WechatFriendJob.php +++ b/Server/application/job/WechatFriendJob.php @@ -2,6 +2,7 @@ namespace app\job; +use app\command\WechatFriendCommand; use think\queue\Job; use think\facade\Log; use think\Queue; @@ -20,116 +21,145 @@ class WechatFriendJob public function fire(Job $job, $data) { try { - // 如果任务执行成功后删除任务 - if ($this->processWechatFriendList($data, $job->attempts())) { + // 获取数据 + $pageIndex = $data['pageIndex']; + $pageSize = $data['pageSize']; + $preFriendId = $data['preFriendId']; + $isDel = $data['isDel']; + $jobId = isset($data['jobId']) ? $data['jobId'] : ''; + $pageIndexCacheKey = isset($data['pageIndexCacheKey']) ? $data['pageIndexCacheKey'] : ''; + $preFriendIdCacheKey = isset($data['preFriendIdCacheKey']) ? $data['preFriendIdCacheKey'] : ''; + $queueLockKey = isset($data['queueLockKey']) ? $data['queueLockKey'] : ''; + + // 记录日志 + Log::info('开始处理微信好友列表任务: ' . json_encode([ + 'pageIndex' => $pageIndex, + 'pageSize' => $pageSize, + 'preFriendId' => $preFriendId, + 'isDel' => $isDel, + 'jobId' => $jobId, + 'pageIndexCacheKey' => $pageIndexCacheKey, + 'preFriendIdCacheKey' => $preFriendIdCacheKey, + 'queueLockKey' => $queueLockKey + ])); + + // 如果没有提供缓存键,根据删除状态和任务ID生成 + if (empty($pageIndexCacheKey)) { + $cacheKeyPrefix = "friendsPage:" . ($jobId ?: date('YmdHis') . rand(1000, 9999)); + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $pageIndexCacheKey = $cacheKeyPrefix . $cacheKeySuffix; + } + + if (empty($preFriendIdCacheKey)) { + $cacheKeyPrefix = "preFriendId:" . ($jobId ?: date('YmdHis') . rand(1000, 9999)); + $cacheKeySuffix = $isDel === '' ? '' : ":{$isDel}"; + $preFriendIdCacheKey = $cacheKeyPrefix . $cacheKeySuffix; + } + + // 如果没有提供队列锁键,生成一个 + if (empty($queueLockKey)) { + $queueLockKey = "queue_lock:wechat_friends:{$isDel}"; + } + + // 实例化控制器 + $wechatFriendController = new WechatFriendController(); + + // 设置请求信息 + $request = request(); + $request->withGet([ + 'pageIndex' => $pageIndex, + 'pageSize' => $pageSize, + 'preFriendId' => $preFriendId + ]); + + // 调用微信好友列表获取方法,传入isDel参数 + $result = $wechatFriendController->getlist($pageIndex, $pageSize, $preFriendId, true, $isDel); + $response = json_decode($result, true); + + // 判断是否成功 + if ($response['code'] == 200) { + $data = $response['data']; + + // 判断是否有下一页 + if (!empty($data) && count($data) > 0) { + // 获取最后一条记录的ID + $lastFriendId = $data[count($data)-1]['id']; + + // 更新缓存中的页码和最后一个好友ID,设置1天过期 + $nextPageIndex = $pageIndex + 1; + Cache::set($pageIndexCacheKey, $nextPageIndex, 86400); + Cache::set($preFriendIdCacheKey, $lastFriendId, 86400); + + Log::info("更新缓存,下一页页码:{$nextPageIndex},最后好友ID:{$lastFriendId},缓存键: {$pageIndexCacheKey}, {$preFriendIdCacheKey}"); + + // 有下一页,将下一页任务添加到队列 + $command = new WechatFriendCommand(); + $command->addToQueue($nextPageIndex, $pageSize, $lastFriendId, $isDel, $jobId, $pageIndexCacheKey, $preFriendIdCacheKey, $queueLockKey); + Log::info("已添加下一页任务到队列: 页码 {$nextPageIndex}"); + } else { + // 没有下一页,重置缓存并释放队列锁 + Cache::set($pageIndexCacheKey, 0, 86400); + Cache::set($preFriendIdCacheKey, '', 86400); + Cache::delete($queueLockKey); + Log::info("所有微信好友列表页面处理完毕,重置页码为0,释放队列锁: {$queueLockKey}"); + } + $job->delete(); - Log::info('微信列表任务执行成功,页码:' . $data['pageIndex']); + Log::info('微信好友列表任务执行成功,页码:' . $pageIndex . ',删除状态:' . $this->getDeleteStatusText($isDel)); + return true; } else { + // API调用出错,记录错误 + $errorMsg = isset($response['msg']) ? $response['msg'] : '未知错误'; + Log::error('获取微信好友列表失败:' . $errorMsg); + if ($job->attempts() > 3) { - // 超过重试次数,删除任务 - Log::error('微信列表任务执行失败,已超过重试次数,页码:' . $data['pageIndex']); + // 超过重试次数,删除任务并释放队列锁 + Cache::delete($queueLockKey); + Log::info("由于错误释放队列锁: {$queueLockKey}"); $job->delete(); } else { // 任务失败,重新放回队列 - Log::warning('微信列表任务执行失败,重试次数:' . $job->attempts() . ',页码:' . $data['pageIndex']); + Log::warning('微信好友列表任务执行失败,重试次数:' . $job->attempts() . ',页码:' . $pageIndex); $job->release(Config::get('queue.failed_delay', 10)); } + + return false; } } catch (\Exception $e) { - // 出现异常,记录日志 - Log::error('微信列表任务异常:' . $e->getMessage()); + // 出现异常,记录错误并释放队列锁 + Log::error('微信好友列表任务异常:' . $e->getMessage()); + + if (!empty($queueLockKey)) { + Cache::delete($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + if ($job->attempts() > 3) { $job->delete(); } else { $job->release(Config::get('queue.failed_delay', 10)); } - } - } - - /** - * 处理微信列表获取 - * @param array $data 任务数据 - * @param int $attempts 重试次数 - * @return bool - */ - protected function processWechatFriendList($data, $attempts) - { - // 获取参数 - $pageIndex = isset($data['pageIndex']) ? $data['pageIndex'] : 0; - $pageSize = isset($data['pageSize']) ? $data['pageSize'] : 1000; - $preFriendId = isset($data['preFriendId']) ? $data['preFriendId'] : ''; - - Log::info('开始获取微信列表,页码:' . $pageIndex . ',页大小:' . $pageSize . ',上一好友ID:' . $preFriendId); - - // 实例化控制器 - $wechatFriendController = new WechatFriendController(); - - // 构建请求参数 - $params = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize, - 'preFriendId' => $preFriendId - ]; - - // 设置请求信息 - $request = request(); - $request->withGet($params); - - - // 调用设备列表获取方法 - $result = $wechatFriendController->getlist($pageIndex,$pageSize,$preFriendId,true); - $response = json_decode($result,true); - - - // 判断是否成功 - if ($response['code'] == 200) { - $data = $response['data']; - // 判断是否有下一页 - if (!empty($data) && count($data) > 0) { - // 获取最后一条记录的ID - $lastFriendId = $data[count($data)-1]['id']; - - // 更新缓存中的页码和最后一个好友ID,设置10分钟过期 - Cache::set('friendsPage', $pageIndex + 1, 86400); - Cache::set('preFriendId', $lastFriendId, 86400); - - Log::info('更新缓存,下一页页码:' . ($pageIndex + 1) . ',最后好友ID:' . $lastFriendId . ',缓存时间:10分钟'); - - // 有下一页,将下一页任务添加到队列 - $nextPageIndex = $pageIndex + 1; - $this->addNextPageToQueue($nextPageIndex, $pageSize, $lastFriendId); - Log::info('添加下一页任务到队列,页码:' . $nextPageIndex); - } else { - // 没有下一页,重置缓存,设置10分钟过期 - Cache::set('friendsPage', 0, 86400); - Cache::set('preFriendId', '', 86400); - Log::info('获取完成,重置缓存,缓存时间:10分钟'); - } - - return true; - } else { - $errorMsg = isset($response['msg']) ? $response['msg'] : '未知错误'; - Log::error('获取微信列表失败:' . $errorMsg); return false; } } /** - * 添加下一页任务到队列 - * @param int $pageIndex 页码 - * @param int $pageSize 每页大小 - * @param string $preFriendId 上一个好友ID + * 获取删除状态的文本描述 + * @param string $isDel 删除状态 + * @return string 状态文本描述 */ - protected function addNextPageToQueue($pageIndex, $pageSize,$preFriendId) + protected function getDeleteStatusText($isDel) { - $data = [ - 'pageIndex' => $pageIndex, - 'pageSize' => $pageSize, - 'preFriendId' => $preFriendId - ]; - - // 添加到队列,设置任务名为 wechat_friends - Queue::push(self::class, $data, 'wechat_friends'); + switch ($isDel) { + case '0': + case 0: + return '未删除(false)'; + case '1': + case 1: + return '已删除(true)'; + default: + return '全部'; + } } -} \ No newline at end of file +} \ No newline at end of file