队列优化
This commit is contained in:
457
Server/application/command/TaskSchedulerCommand.php
Normal file
457
Server/application/command/TaskSchedulerCommand.php
Normal file
@@ -0,0 +1,457 @@
|
||||
<?php
|
||||
|
||||
namespace app\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\facade\Config;
|
||||
use think\facade\Log;
|
||||
use think\facade\Cache;
|
||||
|
||||
/**
|
||||
* 统一任务调度器
|
||||
* 支持多进程并发执行任务
|
||||
*
|
||||
* 使用方法:
|
||||
* php think scheduler:run
|
||||
*
|
||||
* 在 crontab 中配置:
|
||||
* * * * * * cd /path/to/project && php think scheduler:run >> /path/to/log/scheduler.log 2>&1
|
||||
*/
|
||||
class TaskSchedulerCommand extends Command
|
||||
{
|
||||
/**
|
||||
* 任务配置
|
||||
*/
|
||||
protected $tasks = [];
|
||||
|
||||
/**
|
||||
* 最大并发进程数
|
||||
*/
|
||||
protected $maxConcurrent = 10;
|
||||
|
||||
/**
|
||||
* 当前运行的进程数
|
||||
*/
|
||||
protected $runningProcesses = [];
|
||||
|
||||
/**
|
||||
* 日志目录
|
||||
*/
|
||||
protected $logDir = '';
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('scheduler:run')
|
||||
->setDescription('统一任务调度器,支持多进程并发执行所有定时任务');
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$output->writeln('==========================================');
|
||||
$output->writeln('任务调度器启动');
|
||||
$output->writeln('时间: ' . date('Y-m-d H:i:s'));
|
||||
$output->writeln('==========================================');
|
||||
|
||||
// 检查是否支持 pcntl 扩展
|
||||
if (!function_exists('pcntl_fork')) {
|
||||
$output->writeln('<error>错误:系统不支持 pcntl 扩展,无法使用多进程功能</error>');
|
||||
$output->writeln('<info>提示:将使用单进程顺序执行任务</info>');
|
||||
$this->maxConcurrent = 1;
|
||||
}
|
||||
|
||||
// 加载任务配置
|
||||
$this->tasks = Config::get('task_scheduler', []);
|
||||
if (empty($this->tasks)) {
|
||||
$output->writeln('<error>错误:未找到任务配置</error>');
|
||||
return false;
|
||||
}
|
||||
|
||||
// 设置日志目录
|
||||
$this->logDir = runtime_path() . 'log' . DIRECTORY_SEPARATOR;
|
||||
if (!is_dir($this->logDir)) {
|
||||
mkdir($this->logDir, 0755, true);
|
||||
}
|
||||
|
||||
// 获取当前时间
|
||||
$currentTime = time();
|
||||
$currentMinute = date('i', $currentTime);
|
||||
$currentHour = date('H', $currentTime);
|
||||
$currentDay = date('d', $currentTime);
|
||||
$currentMonth = date('m', $currentTime);
|
||||
$currentWeekday = date('w', $currentTime); // 0=Sunday, 6=Saturday
|
||||
|
||||
$output->writeln("当前时间: {$currentHour}:{$currentMinute}");
|
||||
$output->writeln("已加载 " . count($this->tasks) . " 个任务配置");
|
||||
|
||||
// 筛选需要执行的任务
|
||||
$tasksToRun = [];
|
||||
foreach ($this->tasks as $taskId => $task) {
|
||||
if (!isset($task['enabled']) || !$task['enabled']) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) {
|
||||
$tasksToRun[$taskId] = $task;
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($tasksToRun)) {
|
||||
$output->writeln('<info>当前时间没有需要执行的任务</info>');
|
||||
return true;
|
||||
}
|
||||
|
||||
$output->writeln("找到 " . count($tasksToRun) . " 个需要执行的任务");
|
||||
|
||||
// 执行任务
|
||||
if ($this->maxConcurrent > 1 && function_exists('pcntl_fork')) {
|
||||
$this->executeConcurrent($tasksToRun, $output);
|
||||
} else {
|
||||
$this->executeSequential($tasksToRun, $output);
|
||||
}
|
||||
|
||||
// 清理僵尸进程
|
||||
$this->cleanupZombieProcesses();
|
||||
|
||||
$output->writeln('==========================================');
|
||||
$output->writeln('任务调度器执行完成');
|
||||
$output->writeln('==========================================');
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断任务是否应该执行
|
||||
*
|
||||
* @param string $schedule cron表达式,格式:分钟 小时 日 月 星期
|
||||
* @param int $minute 当前分钟
|
||||
* @param int $hour 当前小时
|
||||
* @param int $day 当前日期
|
||||
* @param int $month 当前月份
|
||||
* @param int $weekday 当前星期
|
||||
* @return bool
|
||||
*/
|
||||
protected function shouldRun($schedule, $minute, $hour, $day, $month, $weekday)
|
||||
{
|
||||
$parts = preg_split('/\s+/', trim($schedule));
|
||||
if (count($parts) < 5) {
|
||||
return false;
|
||||
}
|
||||
|
||||
list($scheduleMinute, $scheduleHour, $scheduleDay, $scheduleMonth, $scheduleWeekday) = $parts;
|
||||
|
||||
// 解析分钟
|
||||
if (!$this->matchCronField($scheduleMinute, $minute)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 解析小时
|
||||
if (!$this->matchCronField($scheduleHour, $hour)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 解析日期
|
||||
if (!$this->matchCronField($scheduleDay, $day)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 解析月份
|
||||
if (!$this->matchCronField($scheduleMonth, $month)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 解析星期(注意:cron中0和7都表示星期日)
|
||||
if ($scheduleWeekday !== '*') {
|
||||
$scheduleWeekday = str_replace('7', '0', $scheduleWeekday);
|
||||
if (!$this->matchCronField($scheduleWeekday, $weekday)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 匹配cron字段
|
||||
*
|
||||
* @param string $field cron字段表达式
|
||||
* @param int $value 当前值
|
||||
* @return bool
|
||||
*/
|
||||
protected function matchCronField($field, $value)
|
||||
{
|
||||
// 通配符
|
||||
if ($field === '*') {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 列表(逗号分隔)
|
||||
if (strpos($field, ',') !== false) {
|
||||
$values = explode(',', $field);
|
||||
foreach ($values as $v) {
|
||||
if ($this->matchCronField(trim($v), $value)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// 范围(如 1-5)
|
||||
if (strpos($field, '-') !== false) {
|
||||
list($start, $end) = explode('-', $field);
|
||||
return $value >= (int)$start && $value <= (int)$end;
|
||||
}
|
||||
|
||||
// 步长(如 */5 或 0-59/5)
|
||||
if (strpos($field, '/') !== false) {
|
||||
$parts = explode('/', $field);
|
||||
$base = $parts[0];
|
||||
$step = (int)$parts[1];
|
||||
|
||||
if ($base === '*') {
|
||||
return $value % $step === 0;
|
||||
} else {
|
||||
// 处理范围步长,如 0-59/5
|
||||
if (strpos($base, '-') !== false) {
|
||||
list($start, $end) = explode('-', $base);
|
||||
if ($value >= (int)$start && $value <= (int)$end) {
|
||||
return ($value - (int)$start) % $step === 0;
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
return $value % $step === 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 精确匹配
|
||||
return (int)$field === $value;
|
||||
}
|
||||
|
||||
/**
|
||||
* 并发执行任务(多进程)
|
||||
*
|
||||
* @param array $tasks 任务列表
|
||||
* @param Output $output 输出对象
|
||||
*/
|
||||
protected function executeConcurrent($tasks, Output $output)
|
||||
{
|
||||
$output->writeln('<info>使用多进程并发执行任务(最大并发数:' . $this->maxConcurrent . ')</info>');
|
||||
|
||||
foreach ($tasks as $taskId => $task) {
|
||||
// 等待可用进程槽
|
||||
while (count($this->runningProcesses) >= $this->maxConcurrent) {
|
||||
$this->waitForProcesses();
|
||||
usleep(100000); // 等待100ms
|
||||
}
|
||||
|
||||
// 检查任务是否已经在运行(防止重复执行)
|
||||
$lockKey = "scheduler_task_lock:{$taskId}";
|
||||
$lockTime = Cache::get($lockKey);
|
||||
if ($lockTime && (time() - $lockTime) < 300) { // 5分钟内不重复执行
|
||||
$output->writeln("<comment>任务 {$taskId} 正在运行中,跳过</comment>");
|
||||
continue;
|
||||
}
|
||||
|
||||
// 创建子进程
|
||||
$pid = pcntl_fork();
|
||||
|
||||
if ($pid == -1) {
|
||||
// 创建进程失败
|
||||
$output->writeln("<error>创建子进程失败:{$taskId}</error>");
|
||||
Log::error("任务调度器:创建子进程失败", ['task' => $taskId]);
|
||||
continue;
|
||||
} elseif ($pid == 0) {
|
||||
// 子进程:执行任务
|
||||
$this->runTask($taskId, $task);
|
||||
exit(0);
|
||||
} else {
|
||||
// 父进程:记录子进程PID
|
||||
$this->runningProcesses[$pid] = [
|
||||
'task_id' => $taskId,
|
||||
'start_time' => time(),
|
||||
];
|
||||
$output->writeln("<info>启动任务:{$taskId} (PID: {$pid})</info>");
|
||||
|
||||
// 设置任务锁
|
||||
Cache::set($lockKey, time(), 600); // 10分钟过期
|
||||
}
|
||||
}
|
||||
|
||||
// 等待所有子进程完成
|
||||
while (!empty($this->runningProcesses)) {
|
||||
$this->waitForProcesses();
|
||||
usleep(500000); // 等待500ms
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 顺序执行任务(单进程)
|
||||
*
|
||||
* @param array $tasks 任务列表
|
||||
* @param Output $output 输出对象
|
||||
*/
|
||||
protected function executeSequential($tasks, Output $output)
|
||||
{
|
||||
$output->writeln('<info>使用单进程顺序执行任务</info>');
|
||||
|
||||
foreach ($tasks as $taskId => $task) {
|
||||
$output->writeln("<info>执行任务:{$taskId}</info>");
|
||||
$this->runTask($taskId, $task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行单个任务
|
||||
*
|
||||
* @param string $taskId 任务ID
|
||||
* @param array $task 任务配置
|
||||
*/
|
||||
protected function runTask($taskId, $task)
|
||||
{
|
||||
$startTime = microtime(true);
|
||||
$logFile = $this->logDir . ($task['log_file'] ?? "scheduler_{$taskId}.log");
|
||||
|
||||
// 确保日志目录存在
|
||||
$logDir = dirname($logFile);
|
||||
if (!is_dir($logDir)) {
|
||||
mkdir($logDir, 0755, true);
|
||||
}
|
||||
|
||||
// 构建命令
|
||||
$thinkPath = root_path() . 'think';
|
||||
$command = "php {$thinkPath} {$task['command']}";
|
||||
if (!empty($task['options'])) {
|
||||
foreach ($task['options'] as $option) {
|
||||
$command .= ' ' . escapeshellarg($option);
|
||||
}
|
||||
}
|
||||
|
||||
// 添加日志重定向
|
||||
$command .= " >> " . escapeshellarg($logFile) . " 2>&1";
|
||||
|
||||
// 记录任务开始
|
||||
$logMessage = "\n" . str_repeat('=', 60) . "\n";
|
||||
$logMessage .= "任务开始执行: {$taskId}\n";
|
||||
$logMessage .= "执行时间: " . date('Y-m-d H:i:s') . "\n";
|
||||
$logMessage .= "命令: {$command}\n";
|
||||
$logMessage .= str_repeat('=', 60) . "\n";
|
||||
file_put_contents($logFile, $logMessage, FILE_APPEND);
|
||||
|
||||
// 执行命令
|
||||
$descriptorspec = [
|
||||
0 => ['file', (PHP_OS_FAMILY === 'Windows' ? 'NUL' : '/dev/null'), 'r'], // stdin
|
||||
1 => ['file', $logFile, 'a'], // stdout
|
||||
2 => ['file', $logFile, 'a'], // stderr
|
||||
];
|
||||
|
||||
$process = @proc_open($command, $descriptorspec, $pipes, root_path());
|
||||
|
||||
if (is_resource($process)) {
|
||||
// 关闭管道
|
||||
if (isset($pipes[0])) @fclose($pipes[0]);
|
||||
if (isset($pipes[1])) @fclose($pipes[1]);
|
||||
if (isset($pipes[2])) @fclose($pipes[2]);
|
||||
|
||||
// 设置超时
|
||||
$timeout = $task['timeout'] ?? 3600;
|
||||
$startWaitTime = time();
|
||||
|
||||
// 等待进程完成或超时
|
||||
while (true) {
|
||||
$status = proc_get_status($process);
|
||||
|
||||
if (!$status['running']) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 检查超时
|
||||
if ((time() - $startWaitTime) > $timeout) {
|
||||
if (function_exists('proc_terminate')) {
|
||||
proc_terminate($process, SIGTERM);
|
||||
// 等待进程终止
|
||||
sleep(2);
|
||||
$status = proc_get_status($process);
|
||||
if ($status['running']) {
|
||||
// 强制终止
|
||||
proc_terminate($process, SIGKILL);
|
||||
}
|
||||
}
|
||||
Log::warning("任务执行超时", [
|
||||
'task' => $taskId,
|
||||
'timeout' => $timeout,
|
||||
]);
|
||||
break;
|
||||
}
|
||||
|
||||
usleep(500000); // 等待500ms
|
||||
}
|
||||
|
||||
// 关闭进程
|
||||
proc_close($process);
|
||||
} else {
|
||||
// 如果 proc_open 失败,尝试直接执行(后台执行)
|
||||
if (PHP_OS_FAMILY === 'Windows') {
|
||||
pclose(popen("start /B " . $command, "r"));
|
||||
} else {
|
||||
exec($command . ' > /dev/null 2>&1 &');
|
||||
}
|
||||
}
|
||||
|
||||
$endTime = microtime(true);
|
||||
$duration = round($endTime - $startTime, 2);
|
||||
|
||||
// 记录任务完成
|
||||
$logMessage = "\n" . str_repeat('=', 60) . "\n";
|
||||
$logMessage .= "任务执行完成: {$taskId}\n";
|
||||
$logMessage .= "完成时间: " . date('Y-m-d H:i:s') . "\n";
|
||||
$logMessage .= "执行时长: {$duration} 秒\n";
|
||||
$logMessage .= str_repeat('=', 60) . "\n";
|
||||
file_put_contents($logFile, $logMessage, FILE_APPEND);
|
||||
|
||||
Log::info("任务执行完成", [
|
||||
'task' => $taskId,
|
||||
'duration' => $duration,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* 等待进程完成
|
||||
*/
|
||||
protected function waitForProcesses()
|
||||
{
|
||||
foreach ($this->runningProcesses as $pid => $info) {
|
||||
$status = 0;
|
||||
$result = pcntl_waitpid($pid, $status, WNOHANG);
|
||||
|
||||
if ($result == $pid || $result == -1) {
|
||||
// 进程已结束
|
||||
unset($this->runningProcesses[$pid]);
|
||||
|
||||
$duration = time() - $info['start_time'];
|
||||
Log::info("子进程执行完成", [
|
||||
'pid' => $pid,
|
||||
'task' => $info['task_id'],
|
||||
'duration' => $duration,
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理僵尸进程
|
||||
*/
|
||||
protected function cleanupZombieProcesses()
|
||||
{
|
||||
if (!function_exists('pcntl_waitpid')) {
|
||||
return;
|
||||
}
|
||||
|
||||
$status = 0;
|
||||
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
|
||||
// 清理僵尸进程
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user