diff --git a/Server/application/chukebao/controller/ReplyController.php b/Server/application/chukebao/controller/ReplyController.php index fb1dea7f..c7c2fde6 100644 --- a/Server/application/chukebao/controller/ReplyController.php +++ b/Server/application/chukebao/controller/ReplyController.php @@ -127,6 +127,27 @@ class ReplyController extends BaseController if ($title === '') { return ResponseHelper::error('标题不能为空'); } + if ($content === '') { + return ResponseHelper::error('内容不能为空'); + } + + // 根据 msgType 处理 content:3=图片,43=视频,49=链接 需要 JSON 编码 + if (in_array($msgType, [3, 43, 49])) { + // 如果 content 已经是数组,直接编码;如果是字符串,先尝试解码再编码(确保格式正确) + if (is_array($content)) { + $content = json_encode($content, JSON_UNESCAPED_UNICODE); + } elseif (is_string($content)) { + // 尝试解析,如果已经是 JSON 字符串,确保格式正确 + $decoded = json_decode($content, true); + if ($decoded !== null) { + // 是有效的 JSON,重新编码确保格式统一 + $content = json_encode($decoded, JSON_UNESCAPED_UNICODE); + } else { + // 不是 JSON,直接编码 + $content = json_encode($content, JSON_UNESCAPED_UNICODE); + } + } + } try { $now = time(); @@ -142,12 +163,20 @@ class ReplyController extends BaseController 'lastUpdateTime' => $now, 'userId' => $userId, ]; - /** @var Reply $reply */ $reply = new Reply(); $reply->save($data); - return ResponseHelper::success($reply->toArray(), '创建成功'); + // 返回时解析 content(与 buildGroupData 保持一致) + $replyData = $reply->toArray(); + if (in_array($msgType, [3, 43, 49]) && !empty($replyData['content'])) { + $decoded = json_decode($replyData['content'], true); + if ($decoded !== null) { + $replyData['content'] = $decoded; + } + } + + return ResponseHelper::success($replyData, '创建成功'); } catch (\Exception $e) { return ResponseHelper::error('创建失败:' . $e->getMessage()); } @@ -232,9 +261,46 @@ class ReplyController extends BaseController $sortIndex = $this->request->param('sortIndex', null); if ($groupId !== null) $data['groupId'] = (int)$groupId; - if ($title !== null) $data['title'] = $title; + if ($title !== null) { + if ($title === '') { + return ResponseHelper::error('标题不能为空'); + } + $data['title'] = $title; + } if ($msgType !== null) $data['msgType'] = (int)$msgType; - if ($content !== null) $data['content'] = $content; + if ($content !== null) { + // 确定 msgType:如果传了新的 msgType,用新的;否则用原有的 + $currentMsgType = $msgType !== null ? (int)$msgType : null; + if ($currentMsgType === null) { + // 需要查询原有的 msgType + $reply = Reply::where(['id' => $id, 'isDel' => 0])->find(); + if (empty($reply)) { + return ResponseHelper::error('快捷语不存在'); + } + $currentMsgType = $reply->msgType; + } + + // 根据 msgType 处理 content:3=图片,43=视频,49=链接 需要 JSON 编码 + if (in_array($currentMsgType, [3, 43, 49])) { + // 如果 content 已经是数组,直接编码;如果是字符串,先尝试解码再编码(确保格式正确) + if (is_array($content)) { + $data['content'] = json_encode($content, JSON_UNESCAPED_UNICODE); + } elseif (is_string($content)) { + // 尝试解析,如果已经是 JSON 字符串,确保格式正确 + $decoded = json_decode($content, true); + if ($decoded !== null) { + // 是有效的 JSON,重新编码确保格式统一 + $data['content'] = json_encode($decoded, JSON_UNESCAPED_UNICODE); + } else { + // 不是 JSON,直接编码 + $data['content'] = json_encode($content, JSON_UNESCAPED_UNICODE); + } + } + } else { + // 文本类型,直接使用 + $data['content'] = $content; + } + } if ($sortIndex !== null) $data['sortIndex'] = (string)$sortIndex; if (!empty($data)) { $data['lastUpdateTime'] = time(); @@ -245,12 +311,23 @@ class ReplyController extends BaseController } try { - $reply = Reply::where(['id' => $id,'isDel' => 0])->find(); + $reply = Reply::where(['id' => $id, 'isDel' => 0])->find(); if (empty($reply)) { return ResponseHelper::error('快捷语不存在'); } $reply->save($data); - return ResponseHelper::success($reply->toArray(), '更新成功'); + + // 返回时解析 content(与 buildGroupData 保持一致) + $replyData = $reply->toArray(); + $finalMsgType = isset($data['msgType']) ? $data['msgType'] : $reply->msgType; + if (in_array($finalMsgType, [3, 43, 49]) && !empty($replyData['content'])) { + $decoded = json_decode($replyData['content'], true); + if ($decoded !== null) { + $replyData['content'] = $decoded; + } + } + + return ResponseHelper::success($replyData, '更新成功'); } catch (\Exception $e) { return ResponseHelper::error('更新失败:' . $e->getMessage()); } @@ -329,10 +406,24 @@ class ReplyController extends BaseController // 获取该分组下的快捷回复 $replies = Reply::where($replyWhere) - ->order('sortIndex asc, id desc - ') + ->order('sortIndex asc, id desc') ->select(); + // 解析 replies 的 content 字段(根据 msgType 判断是否需要 JSON 解析) + $repliesArray = []; + foreach ($replies as $reply) { + $replyData = $reply->toArray(); + // 根据 msgType 解析 content:3=图片,43=视频,49=链接 + if (in_array($replyData['msgType'], [3, 43, 49]) && !empty($replyData['content'])) { + $decoded = json_decode($replyData['content'], true); + // 如果解析成功,使用解析后的内容;否则保持原样 + if ($decoded !== null) { + $replyData['content'] = $decoded; + } + } + $repliesArray[] = $replyData; + } + return [ 'id' => $group->id, 'groupName' => $group->groupName, @@ -342,7 +433,7 @@ class ReplyController extends BaseController 'replys' => $group->replys, 'companyId' => $group->companyId, 'userId' => $group->userId, - 'replies' => $replies->toArray(), + 'replies' => $repliesArray, 'children' => [] // 子分组 ]; } diff --git a/Server/application/command/TaskSchedulerCommand.php b/Server/application/command/TaskSchedulerCommand.php index b23dae2d..3e6edba2 100644 --- a/Server/application/command/TaskSchedulerCommand.php +++ b/Server/application/command/TaskSchedulerCommand.php @@ -40,11 +40,18 @@ class TaskSchedulerCommand extends Command * 日志目录 */ protected $logDir = ''; + + /** + * 锁文件目录 + */ + protected $lockDir = ''; protected function configure() { $this->setName('scheduler:run') - ->setDescription('统一任务调度器,支持多进程并发执行所有定时任务'); + ->setDescription('统一任务调度器,支持多进程并发执行所有定时任务') + ->addOption('task', 't', \think\console\input\Option::VALUE_OPTIONAL, '指定要执行的任务ID(测试模式,忽略Cron表达式)', '') + ->addOption('force', 'f', \think\console\input\Option::VALUE_NONE, '强制执行所有启用的任务(忽略Cron表达式)'); } protected function execute(Input $input, Output $output) @@ -61,34 +68,25 @@ class TaskSchedulerCommand extends Command $this->maxConcurrent = 1; } + // 获取项目根目录(使用 __DIR__ 更可靠) + // TaskSchedulerCommand.php 位于 application/command/,向上两级到项目根目录 + $rootPath = dirname(__DIR__, 2); + // 加载任务配置 // 方法1:尝试通过框架配置加载 $this->tasks = Config::get('task_scheduler', []); // 方法2:如果框架配置没有,直接加载配置文件 if (empty($this->tasks)) { - // 获取项目根目录 - if (!defined('ROOT_PATH')) { - define('ROOT_PATH', dirname(__DIR__, 2)); - } + $configFile = $rootPath . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php'; - // 尝试多个可能的路径 - $possiblePaths = [ - ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php', - __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php', - dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php', - ]; - - foreach ($possiblePaths as $configFile) { - if (is_file($configFile)) { - $output->writeln("找到配置文件:{$configFile}"); - $config = include $configFile; - if (is_array($config) && !empty($config)) { - $this->tasks = $config; - break; - } else { - $output->writeln("配置文件返回的不是数组或为空:{$configFile}"); - } + if (is_file($configFile)) { + $output->writeln("找到配置文件:{$configFile}"); + $config = include $configFile; + if (is_array($config) && !empty($config)) { + $this->tasks = $config; + } else { + $output->writeln("配置文件返回的不是数组或为空:{$configFile}"); } } } @@ -99,22 +97,21 @@ class TaskSchedulerCommand extends Command $output->writeln('1. config/task_scheduler.php 文件是否存在'); $output->writeln('2. 文件是否返回有效的数组'); $output->writeln('3. 文件权限是否正确'); - if (defined('ROOT_PATH')) { - $output->writeln('项目根目录:' . ROOT_PATH . ''); - $output->writeln('期望配置文件:' . ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php'); - } + $output->writeln('项目根目录:' . $rootPath . ''); + $output->writeln('期望配置文件:' . $rootPath . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php'); return false; } - // 设置日志目录(ThinkPHP5 中无 runtime_path 辅助函数,直接使用 ROOT_PATH/runtime/log) - if (!defined('ROOT_PATH')) { - // CLI 下正常情况下 ROOT_PATH 已在入口脚本 define,这里兜底一次 - define('ROOT_PATH', dirname(__DIR__, 2)); - } - $this->logDir = ROOT_PATH . DIRECTORY_SEPARATOR . 'runtime' . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR; + // 设置日志目录和锁文件目录(使用 __DIR__ 获取的根目录) + $this->logDir = $rootPath . DIRECTORY_SEPARATOR . 'runtime' . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR; + $this->lockDir = $rootPath . DIRECTORY_SEPARATOR . 'runtime' . DIRECTORY_SEPARATOR . 'lock' . DIRECTORY_SEPARATOR; + if (!is_dir($this->logDir)) { mkdir($this->logDir, 0755, true); } + if (!is_dir($this->lockDir)) { + mkdir($this->lockDir, 0755, true); + } // 获取当前时间 $currentTime = time(); @@ -124,36 +121,75 @@ class TaskSchedulerCommand extends Command $currentMonth = date('m', $currentTime); $currentWeekday = date('w', $currentTime); // 0=Sunday, 6=Saturday + // 获取命令行参数 + $testTaskId = $input->getOption('task'); + $force = $input->getOption('force'); + $output->writeln("当前时间: {$currentHour}:{$currentMinute}"); $output->writeln("已加载 " . count($this->tasks) . " 个任务配置"); - // 筛选需要执行的任务 - $tasksToRun = []; - $enabledCount = 0; - $disabledCount = 0; - - foreach ($this->tasks as $taskId => $task) { - if (!isset($task['enabled']) || !$task['enabled']) { - $disabledCount++; - continue; + // 测试模式:只执行指定的任务 + if (!empty($testTaskId)) { + if (!isset($this->tasks[$testTaskId])) { + $output->writeln("错误:任务 {$testTaskId} 不存在"); + $output->writeln("可用任务列表:"); + foreach ($this->tasks as $id => $task) { + $taskName = $task['name'] ?? $id; + $enabled = isset($task['enabled']) && $task['enabled'] ? '✓' : '✗'; + $output->writeln(" {$enabled} {$taskName} ({$id})"); + } + return false; } - $enabledCount++; - if ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) { - $tasksToRun[$taskId] = $task; - $output->writeln("任务 {$taskId} 符合执行条件(schedule: {$task['schedule']})"); + $task = $this->tasks[$testTaskId]; + if (!isset($task['enabled']) || !$task['enabled']) { + $output->writeln("错误:任务 {$testTaskId} 已禁用"); + return false; } + + $taskName = $task['name'] ?? $testTaskId; + $output->writeln("测试模式:执行任务 {$taskName} ({$testTaskId})"); + $output->writeln("注意:测试模式会忽略 Cron 表达式,直接执行任务"); + + $tasksToRun = [$testTaskId => $task]; + } else { + // 正常模式:筛选需要执行的任务 + $tasksToRun = []; + $enabledCount = 0; + $disabledCount = 0; + + foreach ($this->tasks as $taskId => $task) { + if (!isset($task['enabled']) || !$task['enabled']) { + $disabledCount++; + continue; + } + $enabledCount++; + + // 强制模式:忽略 Cron 表达式,执行所有启用的任务 + if ($force) { + $tasksToRun[$taskId] = $task; + $taskName = $task['name'] ?? $taskId; + $output->writeln("强制模式:任务 {$taskName} ({$taskId}) 将被执行"); + } elseif ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) { + $tasksToRun[$taskId] = $task; + $taskName = $task['name'] ?? $taskId; + $output->writeln("任务 {$taskName} ({$taskId}) 符合执行条件(schedule: {$task['schedule']})"); + } + } + + $output->writeln("已启用任务数: {$enabledCount},已禁用任务数: {$disabledCount}"); + + if (empty($tasksToRun)) { + $output->writeln('当前时间没有需要执行的任务'); + if (!$force) { + $output->writeln('提示:使用 --force 参数可以强制执行所有启用的任务'); + } + return true; + } + + $output->writeln("找到 " . count($tasksToRun) . " 个需要执行的任务"); } - $output->writeln("已启用任务数: {$enabledCount},已禁用任务数: {$disabledCount}"); - - if (empty($tasksToRun)) { - $output->writeln('当前时间没有需要执行的任务'); - return true; - } - - $output->writeln("找到 " . count($tasksToRun) . " 个需要执行的任务"); - // 执行任务 if ($this->maxConcurrent > 1 && function_exists('pcntl_fork')) { $this->executeConcurrent($tasksToRun, $output); @@ -172,7 +208,7 @@ class TaskSchedulerCommand extends Command } /** - * 判断任务是否应该执行 + * 判断任务是否应该执行(参考 schedule.php 的实现) * * @param string $schedule cron表达式,格式:分钟 小时 日 月 星期 * @param int $minute 当前分钟 @@ -185,36 +221,36 @@ class TaskSchedulerCommand extends Command protected function shouldRun($schedule, $minute, $hour, $day, $month, $weekday) { $parts = preg_split('/\s+/', trim($schedule)); - if (count($parts) < 5) { + if (count($parts) !== 5) { return false; } list($scheduleMinute, $scheduleHour, $scheduleDay, $scheduleMonth, $scheduleWeekday) = $parts; // 解析分钟 - if (!$this->matchCronField($scheduleMinute, $minute)) { + if (!$this->matchCronPart($scheduleMinute, $minute)) { return false; } // 解析小时 - if (!$this->matchCronField($scheduleHour, $hour)) { + if (!$this->matchCronPart($scheduleHour, $hour)) { return false; } // 解析日期 - if (!$this->matchCronField($scheduleDay, $day)) { + if (!$this->matchCronPart($scheduleDay, $day)) { return false; } // 解析月份 - if (!$this->matchCronField($scheduleMonth, $month)) { + if (!$this->matchCronPart($scheduleMonth, $month)) { return false; } - // 解析星期(注意:cron中0和7都表示星期日) + // 解析星期(注意:cron中0和7都表示星期日,PHP的wday中0=Sunday) if ($scheduleWeekday !== '*') { $scheduleWeekday = str_replace('7', '0', $scheduleWeekday); - if (!$this->matchCronField($scheduleWeekday, $weekday)) { + if (!$this->matchCronPart($scheduleWeekday, $weekday)) { return false; } } @@ -223,60 +259,49 @@ class TaskSchedulerCommand extends Command } /** - * 匹配cron字段 + * 匹配Cron表达式的单个部分(参考 schedule.php 的实现) * - * @param string $field cron字段表达式 + * @param string $pattern cron字段表达式 * @param int $value 当前值 * @return bool */ - protected function matchCronField($field, $value) + protected function matchCronPart($pattern, $value) { - // 通配符 - if ($field === '*') { + // * 表示匹配所有 + if ($pattern === '*') { return true; } - // 列表(逗号分隔) - if (strpos($field, ',') !== false) { - $values = explode(',', $field); + // 数字,精确匹配 + if (is_numeric($pattern)) { + return (int)$pattern === $value; + } + + // */n 表示每n个单位 + if (preg_match('/^\*\/(\d+)$/', $pattern, $matches)) { + $interval = (int)$matches[1]; + return $value % $interval === 0; + } + + // n-m 表示范围 + if (preg_match('/^(\d+)-(\d+)$/', $pattern, $matches)) { + $min = (int)$matches[1]; + $max = (int)$matches[2]; + return $value >= $min && $value <= $max; + } + + // n,m 表示多个值 + if (strpos($pattern, ',') !== false) { + $values = explode(',', $pattern); foreach ($values as $v) { - if ($this->matchCronField(trim($v), $value)) { + if ((int)trim($v) === $value) { return true; } } return false; } - // 范围(如 1-5) - if (strpos($field, '-') !== false) { - list($start, $end) = explode('-', $field); - return $value >= (int)$start && $value <= (int)$end; - } - - // 步长(如 */5 或 0-59/5) - if (strpos($field, '/') !== false) { - $parts = explode('/', $field); - $base = $parts[0]; - $step = (int)$parts[1]; - - if ($base === '*') { - return $value % $step === 0; - } else { - // 处理范围步长,如 0-59/5 - if (strpos($base, '-') !== false) { - list($start, $end) = explode('-', $base); - if ($value >= (int)$start && $value <= (int)$end) { - return ($value - (int)$start) % $step === 0; - } - return false; - } else { - return $value % $step === 0; - } - } - } - - // 精确匹配 - return (int)$field === $value; + return false; } /** @@ -296,39 +321,11 @@ class TaskSchedulerCommand extends Command usleep(100000); // 等待100ms } - // 检查任务是否已经在运行(防止重复执行) - $lockKey = "scheduler_task_lock:{$taskId}"; - $lockTime = Cache::get($lockKey); - - // 如果锁存在,检查进程是否真的在运行 - if ($lockTime) { - $lockPid = Cache::get("scheduler_task_pid:{$taskId}"); - if ($lockPid) { - // 检查进程是否真的在运行 - if (function_exists('posix_kill')) { - // 使用 posix_kill(pid, 0) 检查进程是否存在(0信号不杀死进程,只检查) - if (@posix_kill($lockPid, 0)) { - $output->writeln("任务 {$taskId} 正在运行中(PID: {$lockPid}),跳过"); - continue; - } else { - // 进程不存在,清除锁 - Cache::rm($lockKey); - Cache::rm("scheduler_task_pid:{$taskId}"); - } - } else { - // 如果没有 posix_kill,使用时间判断(2分钟内不重复执行) - if ((time() - $lockTime) < 120) { - $output->writeln("任务 {$taskId} 可能在运行中(2分钟内执行过),跳过"); - continue; - } - } - } else { - // 如果没有PID记录,使用时间判断(2分钟内不重复执行) - if ((time() - $lockTime) < 120) { - $output->writeln("任务 {$taskId} 可能在运行中(2分钟内执行过),跳过"); - continue; - } - } + // 检查任务是否已经在运行(使用文件锁,更可靠) + if ($this->isTaskRunning($taskId)) { + $taskName = $task['name'] ?? $taskId; + $output->writeln("任务 {$taskName} ({$taskId}) 正在运行中,跳过"); + continue; } // 创建子进程 @@ -336,8 +333,9 @@ class TaskSchedulerCommand extends Command if ($pid == -1) { // 创建进程失败 - $output->writeln("创建子进程失败:{$taskId}"); - Log::error("任务调度器:创建子进程失败", ['task' => $taskId]); + $taskName = $task['name'] ?? $taskId; + $output->writeln("创建子进程失败:{$taskName} ({$taskId})"); + Log::error("任务调度器:创建子进程失败", ['task' => $taskId, 'name' => $taskName]); continue; } elseif ($pid == 0) { // 子进程:执行任务 @@ -349,11 +347,11 @@ class TaskSchedulerCommand extends Command 'task_id' => $taskId, 'start_time' => time(), ]; - $output->writeln("启动任务:{$taskId} (PID: {$pid})"); + $taskName = $task['name'] ?? $taskId; + $output->writeln("启动任务:{$taskName} ({$taskId}) (PID: {$pid})"); - // 设置任务锁和PID - Cache::set($lockKey, time(), 600); // 10分钟过期 - Cache::set("scheduler_task_pid:{$taskId}", $pid, 600); // 保存PID,10分钟过期 + // 创建任务锁文件 + $this->createLock($taskId, $pid); } } @@ -375,13 +373,14 @@ class TaskSchedulerCommand extends Command $output->writeln('使用单进程顺序执行任务'); foreach ($tasks as $taskId => $task) { - $output->writeln("执行任务:{$taskId}"); + $taskName = $task['name'] ?? $taskId; + $output->writeln("执行任务:{$taskName} ({$taskId})"); $this->runTask($taskId, $task); } } /** - * 执行单个任务 + * 执行单个任务(参考 schedule.php 的实现,改进超时和错误处理) * * @param string $taskId 任务ID * @param array $task 任务配置 @@ -397,7 +396,6 @@ class TaskSchedulerCommand extends Command mkdir($logDir, 0755, true); } - // 构建命令 // 使用指定的网站目录作为执行目录 $executionPath = '/www/wwwroot/mckb_quwanzhi_com/Server'; @@ -412,6 +410,7 @@ class TaskSchedulerCommand extends Command $errorMsg = "错误:think 文件不存在:{$thinkPath}"; Log::error($errorMsg); file_put_contents($logFile, $errorMsg . "\n", FILE_APPEND); + $this->removeLock($taskId); // 删除锁文件 return; } @@ -423,89 +422,136 @@ class TaskSchedulerCommand extends Command } } - // 添加日志重定向(在后台执行) - $command .= " >> " . escapeshellarg($logFile) . " 2>&1"; + // 获取任务名称 + $taskName = $task['name'] ?? $taskId; // 记录任务开始 $logMessage = "\n" . str_repeat('=', 60) . "\n"; - $logMessage .= "任务开始执行: {$taskId}\n"; + $logMessage .= "任务开始执行: {$taskName} ({$taskId})\n"; $logMessage .= "执行时间: " . date('Y-m-d H:i:s') . "\n"; $logMessage .= "执行目录: {$executionPath}\n"; $logMessage .= "命令: {$command}\n"; $logMessage .= str_repeat('=', 60) . "\n"; file_put_contents($logFile, $logMessage, FILE_APPEND); - // 执行命令(使用指定的执行目录,Linux 环境) + // 设置超时时间 + $timeout = $task['timeout'] ?? 3600; + + // 执行命令(参考 schedule.php 的实现) $descriptorspec = [ - 0 => ['file', '/dev/null', 'r'], // stdin - 1 => ['file', $logFile, 'a'], // stdout - 2 => ['file', $logFile, 'a'], // stderr + 0 => ['pipe', 'r'], // stdin + 1 => ['pipe', 'w'], // stdout + 2 => ['pipe', 'w'], // stderr ]; $process = @proc_open($command, $descriptorspec, $pipes, $executionPath); - if (is_resource($process)) { - // 关闭管道 - if (isset($pipes[0])) @fclose($pipes[0]); - if (isset($pipes[1])) @fclose($pipes[1]); - if (isset($pipes[2])) @fclose($pipes[2]); + if (!is_resource($process)) { + $errorMsg = "任务执行失败: 无法启动进程"; + Log::error($errorMsg, ['task' => $taskId]); + file_put_contents($logFile, $errorMsg . "\n", FILE_APPEND); + $this->removeLock($taskId); // 删除锁文件 + return; + } + + // 设置非阻塞模式 + stream_set_blocking($pipes[1], false); + stream_set_blocking($pipes[2], false); + + $startWaitTime = time(); + $output = ''; + $error = ''; + + // 等待进程完成或超时 + while (true) { + $status = proc_get_status($process); - // 设置超时 - $timeout = $task['timeout'] ?? 3600; - $startWaitTime = time(); + // 读取输出 + $output .= stream_get_contents($pipes[1]); + $error .= stream_get_contents($pipes[2]); - // 等待进程完成或超时 - while (true) { - $status = proc_get_status($process); - - if (!$status['running']) { - break; - } - - // 检查超时 - if ((time() - $startWaitTime) > $timeout) { - if (function_exists('proc_terminate')) { - proc_terminate($process, SIGTERM); - // 等待进程终止 - sleep(2); - $status = proc_get_status($process); - if ($status['running']) { - // 强制终止 - proc_terminate($process, SIGKILL); - } - } - Log::warning("任务执行超时", [ - 'task' => $taskId, - 'timeout' => $timeout, - ]); - break; - } - - usleep(500000); // 等待500ms + // 检查是否完成 + if (!$status['running']) { + break; } - // 关闭进程 - proc_close($process); - } else { - // 如果 proc_open 失败,使用 exec 在后台执行(Linux 环境) - exec("cd " . escapeshellarg($executionPath) . " && " . $command . ' > /dev/null 2>&1 &'); + // 检查超时 + if ((time() - $startWaitTime) > $timeout) { + Log::warning("任务执行超时({$timeout}秒),终止进程", ['task' => $taskId]); + file_put_contents($logFile, "任务执行超时({$timeout}秒),终止进程\n", FILE_APPEND); + + if (function_exists('proc_terminate')) { + proc_terminate($process); + } + + // 关闭管道 + @fclose($pipes[0]); + @fclose($pipes[1]); + @fclose($pipes[2]); + proc_close($process); + + $this->removeLock($taskId); // 删除锁文件 + return; + } + + // 等待100ms + usleep(100000); + } + + // 读取剩余输出 + $output .= stream_get_contents($pipes[1]); + $error .= stream_get_contents($pipes[2]); + + // 关闭管道 + @fclose($pipes[0]); + @fclose($pipes[1]); + @fclose($pipes[2]); + + // 获取退出码 + $exitCode = proc_close($process); + + // 记录输出 + if (!empty($output)) { + file_put_contents($logFile, "任务输出:\n{$output}\n", FILE_APPEND); + } + + if (!empty($error)) { + file_put_contents($logFile, "任务错误:\n{$error}\n", FILE_APPEND); + Log::error("任务执行错误", ['task' => $taskId, 'error' => $error]); } $endTime = microtime(true); $duration = round($endTime - $startTime, 2); + // 获取任务名称 + $taskName = $task['name'] ?? $taskId; + // 记录任务完成 $logMessage = "\n" . str_repeat('=', 60) . "\n"; - $logMessage .= "任务执行完成: {$taskId}\n"; + $logMessage .= "任务执行完成: {$taskName} ({$taskId})\n"; $logMessage .= "完成时间: " . date('Y-m-d H:i:s') . "\n"; $logMessage .= "执行时长: {$duration} 秒\n"; + $logMessage .= "退出码: {$exitCode}\n"; $logMessage .= str_repeat('=', 60) . "\n"; file_put_contents($logFile, $logMessage, FILE_APPEND); - Log::info("任务执行完成", [ - 'task' => $taskId, - 'duration' => $duration, - ]); + if ($exitCode === 0) { + Log::info("任务执行成功", [ + 'task' => $taskId, + 'name' => $taskName, + 'duration' => $duration, + ]); + } else { + Log::error("任务执行失败", [ + 'task' => $taskId, + 'name' => $taskName, + 'duration' => $duration, + 'exit_code' => $exitCode, + ]); + } + + // 删除锁文件(任务完成) + $this->removeLock($taskId); } /** @@ -522,9 +568,8 @@ class TaskSchedulerCommand extends Command $taskId = $info['task_id']; unset($this->runningProcesses[$pid]); - // 清除任务锁和PID - Cache::rm("scheduler_task_lock:{$taskId}"); - Cache::rm("scheduler_task_pid:{$taskId}"); + // 删除任务锁文件 + $this->removeLock($taskId); $duration = time() - $info['start_time']; Log::info("子进程执行完成", [ @@ -550,5 +595,80 @@ class TaskSchedulerCommand extends Command // 清理僵尸进程 } } + + /** + * 检查任务是否正在运行(通过锁文件,参考 schedule.php) + * + * @param string $taskId 任务ID + * @return bool + */ + protected function isTaskRunning($taskId) + { + $lockFile = $this->lockDir . 'schedule_' . md5($taskId) . '.lock'; + + if (!file_exists($lockFile)) { + return false; + } + + // 检查锁文件是否过期(超过1小时认为过期) + $lockTime = filemtime($lockFile); + if (time() - $lockTime > 3600) { + @unlink($lockFile); + return false; + } + + // 读取锁文件中的PID + $lockContent = @file_get_contents($lockFile); + if ($lockContent !== false) { + $lockData = json_decode($lockContent, true); + if (isset($lockData['pid']) && function_exists('posix_kill')) { + // 检查进程是否真的在运行 + if (@posix_kill($lockData['pid'], 0)) { + return true; + } else { + // 进程不存在,删除锁文件 + @unlink($lockFile); + return false; + } + } + } + + // 如果没有PID或无法检查,使用时间判断(2分钟内认为在运行) + if (time() - $lockTime < 120) { + return true; + } + + return false; + } + + /** + * 创建任务锁文件(参考 schedule.php) + * + * @param string $taskId 任务ID + * @param int $pid 进程ID + */ + protected function createLock($taskId, $pid = null) + { + $lockFile = $this->lockDir . 'schedule_' . md5($taskId) . '.lock'; + $lockData = [ + 'task_id' => $taskId, + 'pid' => $pid ?: getmypid(), + 'time' => time(), + ]; + file_put_contents($lockFile, json_encode($lockData)); + } + + /** + * 删除任务锁文件(参考 schedule.php) + * + * @param string $taskId 任务ID + */ + protected function removeLock($taskId) + { + $lockFile = $this->lockDir . 'schedule_' . md5($taskId) . '.lock'; + if (file_exists($lockFile)) { + @unlink($lockFile); + } + } } diff --git a/Server/config/task_scheduler.php b/Server/config/task_scheduler.php index e0b64385..eab1e5b7 100644 --- a/Server/config/task_scheduler.php +++ b/Server/config/task_scheduler.php @@ -8,6 +8,7 @@ return [ // 任务配置格式: // '任务标识' => [ + // 'name' => '任务名称', // 必填:任务的中文名称,用于日志和显示 // 'command' => '命令名称', // 必填:执行的 ThinkPHP 命令(见 application/command.php) // 'schedule' => 'cron表达式', // 必填:cron 表达式,如 '*/5 * * * *' 表示每5分钟 // 'options' => ['--option=value'], // 可选:命令参数(原来 crontab 里的 --xxx=yyy) @@ -23,6 +24,7 @@ return [ // 同步微信好友列表(未删除好友),用于保持系统中好友数据实时更新 'wechat_friends_active' => [ + 'name' => '同步微信好友列表(未删除)', 'command' => 'wechatFriends:list', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => ['--isDel=0'], @@ -31,8 +33,9 @@ return [ 'log_file' => 'crontab_wechatFriends_active.log', ], - // 拉取“添加好友任务”列表,驱动自动加好友的任务队列 + // 拉取"添加好友任务"列表,驱动自动加好友的任务队列 'friend_task' => [ + 'name' => '拉取添加好友任务列表', 'command' => 'friendTask:list', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => [], @@ -43,6 +46,7 @@ return [ // 同步微信好友私聊消息列表,写入消息表,供客服工作台使用 'message_friends' => [ + 'name' => '同步微信好友私聊消息列表', 'command' => 'message:friendsList', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => [], @@ -53,6 +57,7 @@ return [ // 同步微信群聊消息列表,写入消息表,供群聊记录与风控分析 'message_chatroom' => [ + 'name' => '同步微信群聊消息列表', 'command' => 'message:chatroomList', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => [], @@ -63,6 +68,7 @@ return [ // 客服端消息提醒任务,负责给在线客服推送新消息通知 'kf_notice' => [ + 'name' => '客服端消息提醒', 'command' => 'kf:notice', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => [], @@ -77,6 +83,7 @@ return [ // 同步微信设备列表(未删除设备),用于设备管理与监控 'device_active' => [ + 'name' => '同步微信设备列表(未删除)', 'command' => 'device:list', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => ['--isDel=0'], @@ -87,6 +94,7 @@ return [ // 同步微信群聊列表(未删除群),用于群管理与后续任务分配 'wechat_chatroom_active' => [ + 'name' => '同步微信群聊列表(未删除)', 'command' => 'wechatChatroom:list', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => ['--isDel=0'], @@ -97,6 +105,7 @@ return [ // 同步微信群成员列表(群好友),维持群成员明细数据 'group_friends' => [ + 'name' => '同步微信群成员列表', 'command' => 'groupFriends:list', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -105,8 +114,9 @@ return [ 'log_file' => 'crontab_groupFriends.log', ], - // 同步“微信客服列表”,获取绑定到公司的微信号,用于工作台与分配规则 + // 同步"微信客服列表",获取绑定到公司的微信号,用于工作台与分配规则 'wechat_list' => [ + 'name' => '同步微信客服列表', 'command' => 'wechatList:list', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -117,6 +127,7 @@ return [ // 同步公司账号列表(企业/租户账号),供后台管理与统计 'account_list' => [ + 'name' => '同步公司账号列表', 'command' => 'account:list', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -127,6 +138,7 @@ return [ // 内容采集任务,将外部或设备内容同步到系统内容库 'content_collect' => [ + 'name' => '内容采集任务', 'command' => 'content:collect', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -137,6 +149,7 @@ return [ // 工作台:自动点赞好友/客户朋友圈,提高账号活跃度 'workbench_auto_like' => [ + 'name' => '工作台:自动点赞朋友圈', 'command' => 'workbench:autoLike', 'schedule' => '*/6 * * * *', // 每6分钟 'options' => [], @@ -147,6 +160,7 @@ return [ // 工作台:自动建群任务,按规则批量创建微信群 'workbench_group_create' => [ + 'name' => '工作台:自动建群任务', 'command' => 'workbench:groupCreate', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -157,6 +171,7 @@ return [ // 工作台:自动导入通讯录到系统,生成加粉/建群等任务 'workbench_import_contact' => [ + 'name' => '工作台:自动导入通讯录', 'command' => 'workbench:import-contact', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => [], @@ -171,6 +186,7 @@ return [ // 清洗并同步微信原始数据到存客宝业务表(数据治理任务) 'sync_wechat_data' => [ + 'name' => '同步微信原始数据到存客宝', 'command' => 'sync:wechatData', 'schedule' => '*/2 * * * *', // 每2分钟 'options' => [], @@ -181,6 +197,7 @@ return [ // 工作台:流量分发任务,把流量池中的线索按规则分配给微信号或员工 'workbench_traffic_distribute' => [ + 'name' => '工作台:流量分发任务', 'command' => 'workbench:trafficDistribute', 'schedule' => '*/2 * * * *', // 每2分钟 'options' => [], @@ -191,6 +208,7 @@ return [ // 工作台:朋友圈同步任务,拉取并落库朋友圈内容 'workbench_moments' => [ + 'name' => '工作台:朋友圈同步任务', 'command' => 'workbench:moments', 'schedule' => '*/2 * * * *', // 每2分钟 'options' => [], @@ -201,6 +219,7 @@ return [ // 预防性切换好友任务,监控频繁/风控风险,自动切换加人对象,保护微信号 'switch_friends' => [ + 'name' => '预防性切换好友任务', 'command' => 'switch:friends', 'schedule' => '*/2 * * * *', // 每2分钟 'options' => [], @@ -215,6 +234,7 @@ return [ // 拉取设备通话记录(语音/电话),用于质检、统计或标签打分 'call_recording' => [ + 'name' => '拉取设备通话记录', 'command' => 'call-recording:list', 'schedule' => '*/30 * * * *', // 每30分钟 'options' => [], @@ -227,8 +247,9 @@ return [ // 每日 / 每几天任务 // =========================== - // 每日 1:00 同步“已删除设备”列表,补齐历史状态 + // 每日 1:00 同步"已删除设备"列表,补齐历史状态 'device_deleted' => [ + 'name' => '同步已删除设备列表', 'command' => 'device:list', 'schedule' => '0 1 * * *', // 每天1点 'options' => ['--isDel=1'], @@ -237,8 +258,9 @@ return [ 'log_file' => 'crontab_device_deleted.log', ], - // 每日 1:10 同步“已停用设备”列表,更新停用状态 + // 每日 1:10 同步"已停用设备"列表,更新停用状态 'device_stopped' => [ + 'name' => '同步已停用设备列表', 'command' => 'device:list', 'schedule' => '10 1 * * *', // 每天1:10 'options' => ['--isDel=2'], @@ -247,8 +269,9 @@ return [ 'log_file' => 'crontab_device_stopped.log', ], - // 每日 1:30 同步“已删除微信好友”,用于历史恢复与报表 + // 每日 1:30 同步"已删除微信好友",用于历史恢复与报表 'wechat_friends_deleted' => [ + 'name' => '同步已删除微信好友', 'command' => 'wechatFriends:list', 'schedule' => '30 1 * * *', // 每天1:30 'options' => ['--isDel=1'], @@ -257,8 +280,9 @@ return [ 'log_file' => 'crontab_wechatFriends_deleted.log', ], - // 每日 1:30 同步“已删除微信群聊”,用于统计与留痕 + // 每日 1:30 同步"已删除微信群聊",用于统计与留痕 'wechat_chatroom_deleted' => [ + 'name' => '同步已删除微信群聊', 'command' => 'wechatChatroom:list', 'schedule' => '30 1 * * *', // 每天1:30 'options' => ['--isDel=1'], @@ -269,6 +293,7 @@ return [ // 每日 2:00 统一计算所有微信账号健康分(基础分 + 动态分) 'wechat_calculate_score' => [ + 'name' => '计算微信账号健康分', 'command' => 'wechat:calculate-score', 'schedule' => '0 2 * * *', // 每天2点 'options' => [], @@ -281,6 +306,7 @@ return [ // 每 3 天 3:00 全量同步所有在线好友,做一次大规模校准 'sync_all_friends' => [ + 'name' => '全量同步所有在线好友', 'command' => 'sync:allFriends', 'schedule' => '0 3 */3 * *', // 每3天的3点 'options' => [], @@ -291,6 +317,7 @@ return [ // 检查未读/未回复消息并自动迁移好友(每5分钟执行一次) 'check_unread_message' => [ + 'name' => '检查未读/未回复消息并自动迁移好友', 'command' => 'check:unread-message', 'schedule' => '*/5 * * * *', // 每5分钟 'options' => ['--minutes=30'], // 30分钟未读/未回复 @@ -301,6 +328,7 @@ return [ // 同步部门列表,用于部门管理与权限控制 'department_list' => [ + 'name' => '同步部门列表', 'command' => 'department:list', 'schedule' => '*/30 * * * *', // 每30分钟 'options' => [], @@ -311,6 +339,7 @@ return [ // 同步内容库,将外部内容同步到系统内容库 'content_sync' => [ + 'name' => '同步内容库', 'command' => 'content:sync', 'schedule' => '0 2 * * *', // 每天2点 'options' => [], @@ -321,6 +350,7 @@ return [ // 朋友圈采集任务,采集好友朋友圈内容 'moments_collect' => [ + 'name' => '朋友圈采集任务', 'command' => 'moments:collect', 'schedule' => '0 6 * * *', // 每天6点 'options' => [], @@ -331,6 +361,7 @@ return [ // 分配规则列表,同步分配规则数据 'allotrule_list' => [ + 'name' => '同步分配规则列表', 'command' => 'allotrule:list', 'schedule' => '0 3 * * *', // 每天3点 'options' => [], @@ -341,6 +372,7 @@ return [ // 自动创建分配规则,根据规则自动创建分配任务 'allotrule_autocreate' => [ + 'name' => '自动创建分配规则', 'command' => 'allotrule:autocreate', 'schedule' => '0 4 * * *', // 每天4点 'options' => [], @@ -351,6 +383,7 @@ return [ // 工作台:入群欢迎语任务,自动发送入群欢迎消息 'workbench_group_welcome' => [ + 'name' => '工作台:入群欢迎语任务', 'command' => 'workbench:groupWelcome', 'schedule' => '*/1 * * * *', // 每1分钟 'options' => [], @@ -361,6 +394,7 @@ return [ // 采集客服自己的朋友圈,同步客服账号的朋友圈内容 'own_moments_collect' => [ + 'name' => '采集客服自己的朋友圈', 'command' => 'own:moments:collect', 'schedule' => '*/30 * * * *', // 每30分钟 'options' => [],