Files
cunkebao_v3/Server/application/command/TaskSchedulerCommand.php
2026-01-13 15:22:19 +08:00

555 lines
20 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Config;
use think\facade\Log;
use think\facade\Cache;
/**
* 统一任务调度器
* 支持多进程并发执行任务
*
* 使用方法:
* php think scheduler:run
*
* 在 crontab 中配置:
* * * * * * cd /path/to/project && php think scheduler:run >> /path/to/log/scheduler.log 2>&1
*/
class TaskSchedulerCommand extends Command
{
/**
* 任务配置
*/
protected $tasks = [];
/**
* 最大并发进程数
*/
protected $maxConcurrent = 20;
/**
* 当前运行的进程数
*/
protected $runningProcesses = [];
/**
* 日志目录
*/
protected $logDir = '';
protected function configure()
{
$this->setName('scheduler:run')
->setDescription('统一任务调度器,支持多进程并发执行所有定时任务');
}
protected function execute(Input $input, Output $output)
{
$output->writeln('==========================================');
$output->writeln('任务调度器启动');
$output->writeln('时间: ' . date('Y-m-d H:i:s'));
$output->writeln('==========================================');
// 检查是否支持 pcntl 扩展
if (!function_exists('pcntl_fork')) {
$output->writeln('<error>错误:系统不支持 pcntl 扩展,无法使用多进程功能</error>');
$output->writeln('<info>提示:将使用单进程顺序执行任务</info>');
$this->maxConcurrent = 1;
}
// 加载任务配置
// 方法1尝试通过框架配置加载
$this->tasks = Config::get('task_scheduler', []);
// 方法2如果框架配置没有直接加载配置文件
if (empty($this->tasks)) {
// 获取项目根目录
if (!defined('ROOT_PATH')) {
define('ROOT_PATH', dirname(__DIR__, 2));
}
// 尝试多个可能的路径
$possiblePaths = [
ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
__DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
];
foreach ($possiblePaths as $configFile) {
if (is_file($configFile)) {
$output->writeln("<info>找到配置文件:{$configFile}</info>");
$config = include $configFile;
if (is_array($config) && !empty($config)) {
$this->tasks = $config;
break;
} else {
$output->writeln("<error>配置文件返回的不是数组或为空:{$configFile}</error>");
}
}
}
}
if (empty($this->tasks)) {
$output->writeln('<error>错误未找到任务配置task_scheduler</error>');
$output->writeln('<error>请检查以下位置:</error>');
$output->writeln('<error>1. config/task_scheduler.php 文件是否存在</error>');
$output->writeln('<error>2. 文件是否返回有效的数组</error>');
$output->writeln('<error>3. 文件权限是否正确</error>');
if (defined('ROOT_PATH')) {
$output->writeln('<error>项目根目录:' . ROOT_PATH . '</error>');
$output->writeln('<error>期望配置文件:' . ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php</error>');
}
return false;
}
// 设置日志目录ThinkPHP5 中无 runtime_path 辅助函数,直接使用 ROOT_PATH/runtime/log
if (!defined('ROOT_PATH')) {
// CLI 下正常情况下 ROOT_PATH 已在入口脚本 define这里兜底一次
define('ROOT_PATH', dirname(__DIR__, 2));
}
$this->logDir = ROOT_PATH . DIRECTORY_SEPARATOR . 'runtime' . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR;
if (!is_dir($this->logDir)) {
mkdir($this->logDir, 0755, true);
}
// 获取当前时间
$currentTime = time();
$currentMinute = date('i', $currentTime);
$currentHour = date('H', $currentTime);
$currentDay = date('d', $currentTime);
$currentMonth = date('m', $currentTime);
$currentWeekday = date('w', $currentTime); // 0=Sunday, 6=Saturday
$output->writeln("当前时间: {$currentHour}:{$currentMinute}");
$output->writeln("已加载 " . count($this->tasks) . " 个任务配置");
// 筛选需要执行的任务
$tasksToRun = [];
$enabledCount = 0;
$disabledCount = 0;
foreach ($this->tasks as $taskId => $task) {
if (!isset($task['enabled']) || !$task['enabled']) {
$disabledCount++;
continue;
}
$enabledCount++;
if ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) {
$tasksToRun[$taskId] = $task;
$output->writeln("<info>任务 {$taskId} 符合执行条件schedule: {$task['schedule']}</info>");
}
}
$output->writeln("已启用任务数: {$enabledCount},已禁用任务数: {$disabledCount}");
if (empty($tasksToRun)) {
$output->writeln('<info>当前时间没有需要执行的任务</info>');
return true;
}
$output->writeln("找到 " . count($tasksToRun) . " 个需要执行的任务");
// 执行任务
if ($this->maxConcurrent > 1 && function_exists('pcntl_fork')) {
$this->executeConcurrent($tasksToRun, $output);
} else {
$this->executeSequential($tasksToRun, $output);
}
// 清理僵尸进程
$this->cleanupZombieProcesses();
$output->writeln('==========================================');
$output->writeln('任务调度器执行完成');
$output->writeln('==========================================');
return true;
}
/**
* 判断任务是否应该执行
*
* @param string $schedule cron表达式格式分钟 小时 日 月 星期
* @param int $minute 当前分钟
* @param int $hour 当前小时
* @param int $day 当前日期
* @param int $month 当前月份
* @param int $weekday 当前星期
* @return bool
*/
protected function shouldRun($schedule, $minute, $hour, $day, $month, $weekday)
{
$parts = preg_split('/\s+/', trim($schedule));
if (count($parts) < 5) {
return false;
}
list($scheduleMinute, $scheduleHour, $scheduleDay, $scheduleMonth, $scheduleWeekday) = $parts;
// 解析分钟
if (!$this->matchCronField($scheduleMinute, $minute)) {
return false;
}
// 解析小时
if (!$this->matchCronField($scheduleHour, $hour)) {
return false;
}
// 解析日期
if (!$this->matchCronField($scheduleDay, $day)) {
return false;
}
// 解析月份
if (!$this->matchCronField($scheduleMonth, $month)) {
return false;
}
// 解析星期注意cron中0和7都表示星期日
if ($scheduleWeekday !== '*') {
$scheduleWeekday = str_replace('7', '0', $scheduleWeekday);
if (!$this->matchCronField($scheduleWeekday, $weekday)) {
return false;
}
}
return true;
}
/**
* 匹配cron字段
*
* @param string $field cron字段表达式
* @param int $value 当前值
* @return bool
*/
protected function matchCronField($field, $value)
{
// 通配符
if ($field === '*') {
return true;
}
// 列表(逗号分隔)
if (strpos($field, ',') !== false) {
$values = explode(',', $field);
foreach ($values as $v) {
if ($this->matchCronField(trim($v), $value)) {
return true;
}
}
return false;
}
// 范围(如 1-5
if (strpos($field, '-') !== false) {
list($start, $end) = explode('-', $field);
return $value >= (int)$start && $value <= (int)$end;
}
// 步长(如 */5 或 0-59/5
if (strpos($field, '/') !== false) {
$parts = explode('/', $field);
$base = $parts[0];
$step = (int)$parts[1];
if ($base === '*') {
return $value % $step === 0;
} else {
// 处理范围步长,如 0-59/5
if (strpos($base, '-') !== false) {
list($start, $end) = explode('-', $base);
if ($value >= (int)$start && $value <= (int)$end) {
return ($value - (int)$start) % $step === 0;
}
return false;
} else {
return $value % $step === 0;
}
}
}
// 精确匹配
return (int)$field === $value;
}
/**
* 并发执行任务(多进程)
*
* @param array $tasks 任务列表
* @param Output $output 输出对象
*/
protected function executeConcurrent($tasks, Output $output)
{
$output->writeln('<info>使用多进程并发执行任务(最大并发数:' . $this->maxConcurrent . '</info>');
foreach ($tasks as $taskId => $task) {
// 等待可用进程槽
while (count($this->runningProcesses) >= $this->maxConcurrent) {
$this->waitForProcesses();
usleep(100000); // 等待100ms
}
// 检查任务是否已经在运行(防止重复执行)
$lockKey = "scheduler_task_lock:{$taskId}";
$lockTime = Cache::get($lockKey);
// 如果锁存在,检查进程是否真的在运行
if ($lockTime) {
$lockPid = Cache::get("scheduler_task_pid:{$taskId}");
if ($lockPid) {
// 检查进程是否真的在运行
if (function_exists('posix_kill')) {
// 使用 posix_kill(pid, 0) 检查进程是否存在0信号不杀死进程只检查
if (@posix_kill($lockPid, 0)) {
$output->writeln("<comment>任务 {$taskId} 正在运行中PID: {$lockPid}),跳过</comment>");
continue;
} else {
// 进程不存在,清除锁
Cache::rm($lockKey);
Cache::rm("scheduler_task_pid:{$taskId}");
}
} else {
// 如果没有 posix_kill使用时间判断2分钟内不重复执行
if ((time() - $lockTime) < 120) {
$output->writeln("<comment>任务 {$taskId} 可能在运行中2分钟内执行过跳过</comment>");
continue;
}
}
} else {
// 如果没有PID记录使用时间判断2分钟内不重复执行
if ((time() - $lockTime) < 120) {
$output->writeln("<comment>任务 {$taskId} 可能在运行中2分钟内执行过跳过</comment>");
continue;
}
}
}
// 创建子进程
$pid = pcntl_fork();
if ($pid == -1) {
// 创建进程失败
$output->writeln("<error>创建子进程失败:{$taskId}</error>");
Log::error("任务调度器:创建子进程失败", ['task' => $taskId]);
continue;
} elseif ($pid == 0) {
// 子进程:执行任务
$this->runTask($taskId, $task);
exit(0);
} else {
// 父进程记录子进程PID
$this->runningProcesses[$pid] = [
'task_id' => $taskId,
'start_time' => time(),
];
$output->writeln("<info>启动任务:{$taskId} (PID: {$pid})</info>");
// 设置任务锁和PID
Cache::set($lockKey, time(), 600); // 10分钟过期
Cache::set("scheduler_task_pid:{$taskId}", $pid, 600); // 保存PID10分钟过期
}
}
// 等待所有子进程完成
while (!empty($this->runningProcesses)) {
$this->waitForProcesses();
usleep(500000); // 等待500ms
}
}
/**
* 顺序执行任务(单进程)
*
* @param array $tasks 任务列表
* @param Output $output 输出对象
*/
protected function executeSequential($tasks, Output $output)
{
$output->writeln('<info>使用单进程顺序执行任务</info>');
foreach ($tasks as $taskId => $task) {
$output->writeln("<info>执行任务:{$taskId}</info>");
$this->runTask($taskId, $task);
}
}
/**
* 执行单个任务
*
* @param string $taskId 任务ID
* @param array $task 任务配置
*/
protected function runTask($taskId, $task)
{
$startTime = microtime(true);
$logFile = $this->logDir . ($task['log_file'] ?? "scheduler_{$taskId}.log");
// 确保日志目录存在
$logDir = dirname($logFile);
if (!is_dir($logDir)) {
mkdir($logDir, 0755, true);
}
// 构建命令
// 使用指定的网站目录作为执行目录
$executionPath = '/www/wwwroot/mckb_quwanzhi_com/Server';
// 获取 PHP 可执行文件路径
$phpPath = PHP_BINARY ?: 'php';
// 获取 think 脚本路径(使用执行目录)
$thinkPath = $executionPath . DIRECTORY_SEPARATOR . 'think';
// 检查 think 文件是否存在
if (!is_file($thinkPath)) {
$errorMsg = "错误think 文件不存在:{$thinkPath}";
Log::error($errorMsg);
file_put_contents($logFile, $errorMsg . "\n", FILE_APPEND);
return;
}
// 构建命令(使用绝对路径,确保在 Linux 上能正确执行)
$command = escapeshellarg($phpPath) . ' ' . escapeshellarg($thinkPath) . ' ' . escapeshellarg($task['command']);
if (!empty($task['options'])) {
foreach ($task['options'] as $option) {
$command .= ' ' . escapeshellarg($option);
}
}
// 添加日志重定向(在后台执行)
$command .= " >> " . escapeshellarg($logFile) . " 2>&1";
// 记录任务开始
$logMessage = "\n" . str_repeat('=', 60) . "\n";
$logMessage .= "任务开始执行: {$taskId}\n";
$logMessage .= "执行时间: " . date('Y-m-d H:i:s') . "\n";
$logMessage .= "执行目录: {$executionPath}\n";
$logMessage .= "命令: {$command}\n";
$logMessage .= str_repeat('=', 60) . "\n";
file_put_contents($logFile, $logMessage, FILE_APPEND);
// 执行命令使用指定的执行目录Linux 环境)
$descriptorspec = [
0 => ['file', '/dev/null', 'r'], // stdin
1 => ['file', $logFile, 'a'], // stdout
2 => ['file', $logFile, 'a'], // stderr
];
$process = @proc_open($command, $descriptorspec, $pipes, $executionPath);
if (is_resource($process)) {
// 关闭管道
if (isset($pipes[0])) @fclose($pipes[0]);
if (isset($pipes[1])) @fclose($pipes[1]);
if (isset($pipes[2])) @fclose($pipes[2]);
// 设置超时
$timeout = $task['timeout'] ?? 3600;
$startWaitTime = time();
// 等待进程完成或超时
while (true) {
$status = proc_get_status($process);
if (!$status['running']) {
break;
}
// 检查超时
if ((time() - $startWaitTime) > $timeout) {
if (function_exists('proc_terminate')) {
proc_terminate($process, SIGTERM);
// 等待进程终止
sleep(2);
$status = proc_get_status($process);
if ($status['running']) {
// 强制终止
proc_terminate($process, SIGKILL);
}
}
Log::warning("任务执行超时", [
'task' => $taskId,
'timeout' => $timeout,
]);
break;
}
usleep(500000); // 等待500ms
}
// 关闭进程
proc_close($process);
} else {
// 如果 proc_open 失败,使用 exec 在后台执行Linux 环境)
exec("cd " . escapeshellarg($executionPath) . " && " . $command . ' > /dev/null 2>&1 &');
}
$endTime = microtime(true);
$duration = round($endTime - $startTime, 2);
// 记录任务完成
$logMessage = "\n" . str_repeat('=', 60) . "\n";
$logMessage .= "任务执行完成: {$taskId}\n";
$logMessage .= "完成时间: " . date('Y-m-d H:i:s') . "\n";
$logMessage .= "执行时长: {$duration}\n";
$logMessage .= str_repeat('=', 60) . "\n";
file_put_contents($logFile, $logMessage, FILE_APPEND);
Log::info("任务执行完成", [
'task' => $taskId,
'duration' => $duration,
]);
}
/**
* 等待进程完成
*/
protected function waitForProcesses()
{
foreach ($this->runningProcesses as $pid => $info) {
$status = 0;
$result = pcntl_waitpid($pid, $status, WNOHANG);
if ($result == $pid || $result == -1) {
// 进程已结束
$taskId = $info['task_id'];
unset($this->runningProcesses[$pid]);
// 清除任务锁和PID
Cache::rm("scheduler_task_lock:{$taskId}");
Cache::rm("scheduler_task_pid:{$taskId}");
$duration = time() - $info['start_time'];
Log::info("子进程执行完成", [
'pid' => $pid,
'task' => $taskId,
'duration' => $duration,
]);
}
}
}
/**
* 清理僵尸进程
*/
protected function cleanupZombieProcesses()
{
if (!function_exists('pcntl_waitpid')) {
return;
}
$status = 0;
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
// 清理僵尸进程
}
}
}