From 9df664eceedde8655f36141e0d679497aecdf50d Mon Sep 17 00:00:00 2001 From: wong <106998207@qq.com> Date: Wed, 21 May 2025 11:22:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8pcntl=5Ffork=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=A4=9A=E8=BF=9B=E7=A8=8B=E7=82=B9=E8=B5=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Server/application/job/WorkbenchJob.php | 420 +++++++++++++----------- 1 file changed, 232 insertions(+), 188 deletions(-) diff --git a/Server/application/job/WorkbenchJob.php b/Server/application/job/WorkbenchJob.php index 6a8a997d..01f06dd8 100644 --- a/Server/application/job/WorkbenchJob.php +++ b/Server/application/job/WorkbenchJob.php @@ -21,7 +21,7 @@ use app\api\controller\WechatFriendController; class WorkbenchJob { /************************************ - * 常量定义与核心队列处理 + * 常量定义 ************************************/ /** @@ -37,6 +37,10 @@ class WorkbenchJob */ const MAX_RETRY_ATTEMPTS = 3; + /************************************ + * 核心队列处理 + ************************************/ + /** * 队列任务处理 * @param Job $job 队列任务 @@ -62,59 +66,9 @@ class WorkbenchJob return $this->handleJobError($e, $job, $queueLockKey); } } - - /** - * 记录任务开始 - * @param string $jobId - * @param string $queueLockKey - */ - protected function logJobStart($jobId, $queueLockKey) - { - Log::info('开始处理工作台任务: ' . json_encode([ - 'jobId' => $jobId, - 'queueLockKey' => $queueLockKey - ])); - } - - /** - * 处理任务成功 - * @param Job $job - * @param string $queueLockKey - */ - protected function handleJobSuccess($job, $queueLockKey) - { - $job->delete(); - Cache::rm($queueLockKey); - Log::info('工作台任务执行成功'); - } - - /** - * 处理任务错误 - * @param \Exception $e - * @param Job $job - * @param string $queueLockKey - * @return bool - */ - protected function handleJobError(\Exception $e, $job, $queueLockKey) - { - Log::error('工作台任务异常:' . $e->getMessage()); - - if (!empty($queueLockKey)) { - Cache::rm($queueLockKey); - Log::info("由于异常释放队列锁: {$queueLockKey}"); - } - - if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { - $job->delete(); - } else { - $job->release(Config::get('queue.failed_delay', 10)); - } - - return false; - } /************************************ - * 工作台处理核心逻辑 + * 工作台基础功能 ************************************/ /** @@ -129,18 +83,6 @@ class WorkbenchJob ])->order('id DESC')->select(); } - /** - * 处理空工作台情况 - * @param Job $job - * @param string $queueLockKey - */ - protected function handleEmptyWorkbenches(Job $job, $queueLockKey) - { - Log::info('没有需要处理的工作台任务'); - $job->delete(); - Cache::rm($queueLockKey); - } - /** * 处理工作台列表 * @param \think\Collection $workbenches @@ -227,18 +169,19 @@ class WorkbenchJob */ protected function handleAutoLike($workbench, $config) { - if (!$this->validateAutoLikeConfig($workbench, $config)) { return; } + // 验证是否在点赞时间范围内 if (!$this->isWithinLikeTimeRange($config)) { return; } + // 处理分页获取好友列表 $this->processAllFriends($workbench, $config); } - + /** * 处理所有好友分页 * @param Workbench $workbench @@ -252,81 +195,111 @@ class WorkbenchJob if (empty($friendList)) { return; } - - foreach ($friendList as $friend) { - // 验证是否达到点赞次数上限 - $likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']); - if ($likeCount >= $config['maxLikes']) { - Log::info("工作台 {$workbench->id} 点赞次数已达上限"); - return; - } - // 验证是否达到好友点赞次数上限 - $friendMaxLikes = Db::name('workbench_auto_like_item') - ->where('workbenchId', $workbench->id) - ->where('wechatFriendId', $friend['friendId']) - ->count(); - if ($friendMaxLikes < $config['friendMaxLikes']) { - $this->processFriendMoments($workbench, $config, $friend); + + // 将好友列表分成20组 + $friendGroups = array_chunk($friendList, 20); + $processes = []; + + foreach ($friendGroups as $groupIndex => $friendGroup) { + // 创建子进程 + $pid = pcntl_fork(); + + if ($pid == -1) { + // 创建进程失败 + Log::error("工作台 {$workbench->id} 创建进程失败"); + continue; + } else if ($pid) { + // 父进程 + $processes[] = $pid; + } else { + // 子进程 + try { + foreach ($friendGroup as $friend) { + // 验证是否达到点赞次数上限 + $likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']); + if ($likeCount >= $config['maxLikes']) { + Log::info("工作台 {$workbench->id} 点赞次数已达上限"); + continue; + } + + // 验证是否达到好友点赞次数上限 + $friendMaxLikes = Db::name('workbench_auto_like_item') + ->where('workbenchId', $workbench->id) + ->where('wechatFriendId', $friend['friendId']) + ->count(); + + if ($friendMaxLikes < $config['friendMaxLikes']) { + $this->processFriendMoments($workbench, $config, $friend); + } + } + } catch (\Exception $e) { + Log::error("工作台 {$workbench->id} 子进程异常: " . $e->getMessage()); + } + + // 子进程执行完毕后退出 + exit(0); } } + // 等待所有子进程完成 + foreach ($processes as $pid) { + pcntl_waitpid($pid, $status); + } + // 如果当前页数据量等于页大小,说明可能还有更多数据,继续处理下一页 if (count($friendList) == $pageSize) { $this->processAllFriends($workbench, $config, $page + 1, $pageSize); } } - - /** - * 验证自动点赞配置 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @return bool - */ - protected function validateAutoLikeConfig($workbench, $config) - { - $requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime']; - foreach ($requiredFields as $field) { - if (empty($config[$field])) { - Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空"); - return false; - } - } - return true; - } /** - * 获取今日点赞次数 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @return int + * 获取好友列表 + * @param WorkbenchAutoLike $config 配置 + * @param int $page 页码 + * @param int $pageSize 每页大小 + * @return array */ - protected function getTodayLikeCount($workbench, $config, $deviceId) + protected function getFriendList($config, $page = 1, $pageSize = 100) { - return Db::name('workbench_auto_like_item') - ->where('workbenchId', $workbench->id) - ->where('deviceId', $deviceId) - ->whereTime('createTime', 'between', [ - strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'), - strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00') + $friends = json_decode($config['friends'], true); + $devices = json_decode($config['devices'], true); + + $list = Db::table('s2_company_account') + ->alias('ca') + ->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId') + ->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id') + ->join('workbench_auto_like_item wali', 'wali.wechatFriendId = wf.id AND wali.workbenchId = ' . $config['workbenchId'], 'left') + ->where([ + 'ca.status' => 0, + 'wf.isDeleted' => 0, + 'wa.deviceAlive' => 1, + 'wa.wechatAlive' => 1 ]) - ->count(); + ->whereIn('wa.currentDeviceId', $devices) + ->field([ + 'ca.id as accountId', + 'ca.userName', + 'wf.id as friendId', + 'wf.wechatId', + 'wf.wechatAccountId', + 'wa.wechatId as wechatAccountWechatId', + 'wa.currentDeviceId as deviceId', + 'COUNT(wali.id) as like_count' + ]); + + if (!empty($friends) && is_array($friends) && count($friends) > 0) { + $list = $list->whereIn('wf.id', $friends); + } + + $list = $list->group('wf.wechatId') + ->having('like_count < ' . $config['friendMaxLikes']) + ->order('wf.id DESC') + ->page($page, $pageSize) + ->select(); + + return $list; } - /** - * 检查是否在点赞时间范围内 - * @param WorkbenchAutoLike $config - * @return bool - */ - protected function isWithinLikeTimeRange($config) - { - $currentTime = date('H:i'); - if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) { - Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})"); - return false; - } - return true; - } - /** * 处理好友朋友圈 * @param Workbench $workbench @@ -385,28 +358,6 @@ class WorkbenchJob } } - /** - * 获取好友标签 - * @param int $friendId - * @return array - */ - protected function getFriendLabels($friend) - { - // 获取好友标签 - $wechatFriendController = new WechatFriendController(); - $result = $wechatFriendController->getlist([ 'friendKeyword' => $friend['wechatId'],'wechatAccountKeyword' => $friend['wechatAccountWechatId']],true); - $result = json_decode($result, true); - $labels = []; - if(!empty($result['data'])){ - foreach($result['data'] as $item){ - $labels = array_merge($labels, $item['labels']); - } - } - return $labels; - } - - - /** * 获取未点赞的朋友圈 * @param int $friendId @@ -480,6 +431,82 @@ class WorkbenchJob Log::info("工作台 {$workbench->id} 点赞成功: {$moment['snsId']}"); } + /** + * 获取好友标签 + * @param array $friend + * @return array + */ + protected function getFriendLabels($friend) + { + $wechatFriendController = new WechatFriendController(); + $result = $wechatFriendController->getlist([ + 'friendKeyword' => $friend['wechatId'], + 'wechatAccountKeyword' => $friend['wechatAccountWechatId'] + ], true); + + $result = json_decode($result, true); + $labels = []; + + if(!empty($result['data'])){ + foreach($result['data'] as $item){ + $labels = array_merge($labels, $item['labels']); + } + } + + return $labels; + } + + /** + * 验证自动点赞配置 + * @param Workbench $workbench + * @param WorkbenchAutoLike $config + * @return bool + */ + protected function validateAutoLikeConfig($workbench, $config) + { + $requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime']; + foreach ($requiredFields as $field) { + if (empty($config[$field])) { + Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空"); + return false; + } + } + return true; + } + + /** + * 获取今日点赞次数 + * @param Workbench $workbench + * @param WorkbenchAutoLike $config + * @return int + */ + protected function getTodayLikeCount($workbench, $config, $deviceId) + { + return Db::name('workbench_auto_like_item') + ->where('workbenchId', $workbench->id) + ->where('deviceId', $deviceId) + ->whereTime('createTime', 'between', [ + strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'), + strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00') + ]) + ->count(); + } + + /** + * 检查是否在点赞时间范围内 + * @param WorkbenchAutoLike $config + * @return bool + */ + protected function isWithinLikeTimeRange($config) + { + $currentTime = date('H:i'); + if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) { + Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})"); + return false; + } + return true; + } + /************************************ * 朋友圈同步功能 ************************************/ @@ -524,53 +551,70 @@ class WorkbenchJob // TODO: 实现自动建群逻辑 Log::info("处理自动建群任务: {$workbench->id}"); } - + /************************************ - * 辅助方法 + * 任务处理辅助方法 ************************************/ + + /** + * 记录任务开始 + * @param string $jobId + * @param string $queueLockKey + */ + protected function logJobStart($jobId, $queueLockKey) + { + Log::info('开始处理工作台任务: ' . json_encode([ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ])); + } /** - * 获取好友列表 - * @param WorkbenchAutoLike $config 配置 - * @param int $page 页码 - * @param int $pageSize 每页大小 - * @return array + * 处理任务成功 + * @param Job $job + * @param string $queueLockKey */ - protected function getFriendList($config, $page = 1, $pageSize = 100) + protected function handleJobSuccess($job, $queueLockKey) { - $friends = json_decode($config['friends'], true); - $devices = json_decode($config['devices'], true); + $job->delete(); + Cache::rm($queueLockKey); + Log::info('工作台任务执行成功'); + } - $list = Db::table('s2_company_account') - ->alias('ca') - ->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId') - ->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id') - ->where([ - 'ca.status' => 0, - 'wf.isDeleted' => 0, - 'wa.deviceAlive' => 1, - 'wa.wechatAlive' => 1 - ]) - ->whereIn('wa.currentDeviceId', $devices) - ->field([ - 'ca.id as accountId', - 'ca.userName', - 'wf.id as friendId', - 'wf.wechatId', - 'wf.wechatAccountId', - 'wa.wechatId as wechatAccountWechatId', - 'wa.currentDeviceId as deviceId' - ]); - - if (!empty($friends) && is_array($friends) && count($friends) > 0) { - $list = $list->whereIn('wf.id', $friends); + /** + * 处理任务错误 + * @param \Exception $e + * @param Job $job + * @param string $queueLockKey + * @return bool + */ + protected function handleJobError(\Exception $e, $job, $queueLockKey) + { + Log::error('工作台任务异常:' . $e->getMessage()); + + if (!empty($queueLockKey)) { + Cache::rm($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); } - - $list = $list->group('wf.wechatId') - ->order('wf.id DESC') - ->page($page, $pageSize) - ->select(); - - return $list; + + if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { + $job->delete(); + } else { + $job->release(Config::get('queue.failed_delay', 10)); + } + + return false; + } + + /** + * 处理空工作台情况 + * @param Job $job + * @param string $queueLockKey + */ + protected function handleEmptyWorkbenches(Job $job, $queueLockKey) + { + Log::info('没有需要处理的工作台任务'); + $job->delete(); + Cache::rm($queueLockKey); } } \ No newline at end of file