好友迁移 + 定时器优化

This commit is contained in:
wong
2026-01-13 15:22:19 +08:00
parent 3e8b607948
commit 443ef6ad2d
6 changed files with 477 additions and 87 deletions

View File

@@ -22,7 +22,8 @@ class CheckUnreadMessageCommand extends Command
{
$this->setName('check:unread-message')
->setDescription('检查未读/未回复消息并自动迁移好友')
->addOption('minutes', 'm', \think\console\input\Option::VALUE_OPTIONAL, '未读/未回复分钟数默认30分钟', 30);
->addOption('minutes', 'm', \think\console\input\Option::VALUE_OPTIONAL, '未读/未回复分钟数默认30分钟', 30)
->addOption('page-size', 'p', \think\console\input\Option::VALUE_OPTIONAL, '每页处理数量默认100条', 100);
}
protected function execute(Input $input, Output $output)
@@ -32,11 +33,16 @@ class CheckUnreadMessageCommand extends Command
$minutes = 30;
}
$output->writeln("开始检查未读/未回复消息(超过{$minutes}分钟)...");
$pageSize = intval($input->getOption('page-size'));
if ($pageSize <= 0) {
$pageSize = 100;
}
$output->writeln("开始检查未读/未回复消息(超过{$minutes}分钟,每页处理{$pageSize}条)...");
try {
$friendTransferService = new FriendTransferService();
$result = $friendTransferService->checkAndTransferUnreadOrUnrepliedFriends($minutes);
$result = $friendTransferService->checkAndTransferUnreadOrUnrepliedFriends($minutes, $pageSize);
$output->writeln("检查完成:");
$output->writeln(" 总计需要迁移的好友数:{$result['total']}");

View File

@@ -29,7 +29,7 @@ class TaskSchedulerCommand extends Command
/**
* 最大并发进程数
*/
protected $maxConcurrent = 10;
protected $maxConcurrent = 20;
/**
* 当前运行的进程数
@@ -61,23 +61,48 @@ class TaskSchedulerCommand extends Command
$this->maxConcurrent = 1;
}
// 加载任务配置(优先使用框架配置,其次直接引入配置文件,避免加载失败)
// 加载任务配置
// 方法1尝试通过框架配置加载
$this->tasks = Config::get('task_scheduler', []);
// 如果通过 Config 没有读到,再尝试直接 include 配置文件
// 方法2如果框架配置没有直接加载配置文件
if (empty($this->tasks)) {
// 项目根目录为基准查找 config/task_scheduler.php
$configFile = dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php';
if (is_file($configFile)) {
$config = include $configFile;
if (is_array($config) && !empty($config)) {
$this->tasks = $config;
// 获取项目根目录
if (!defined('ROOT_PATH')) {
define('ROOT_PATH', dirname(__DIR__, 2));
}
// 尝试多个可能的路径
$possiblePaths = [
ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
__DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php',
];
foreach ($possiblePaths as $configFile) {
if (is_file($configFile)) {
$output->writeln("<info>找到配置文件:{$configFile}</info>");
$config = include $configFile;
if (is_array($config) && !empty($config)) {
$this->tasks = $config;
break;
} else {
$output->writeln("<error>配置文件返回的不是数组或为空:{$configFile}</error>");
}
}
}
}
if (empty($this->tasks)) {
$output->writeln('<error>错误未找到任务配置task_scheduler,请检查 config/task_scheduler.php 是否存在且返回数组</error>');
$output->writeln('<error>错误未找到任务配置task_scheduler</error>');
$output->writeln('<error>请检查以下位置:</error>');
$output->writeln('<error>1. config/task_scheduler.php 文件是否存在</error>');
$output->writeln('<error>2. 文件是否返回有效的数组</error>');
$output->writeln('<error>3. 文件权限是否正确</error>');
if (defined('ROOT_PATH')) {
$output->writeln('<error>项目根目录:' . ROOT_PATH . '</error>');
$output->writeln('<error>期望配置文件:' . ROOT_PATH . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php</error>');
}
return false;
}
@@ -104,16 +129,24 @@ class TaskSchedulerCommand extends Command
// 筛选需要执行的任务
$tasksToRun = [];
$enabledCount = 0;
$disabledCount = 0;
foreach ($this->tasks as $taskId => $task) {
if (!isset($task['enabled']) || !$task['enabled']) {
$disabledCount++;
continue;
}
$enabledCount++;
if ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) {
$tasksToRun[$taskId] = $task;
$output->writeln("<info>任务 {$taskId} 符合执行条件schedule: {$task['schedule']}</info>");
}
}
$output->writeln("已启用任务数: {$enabledCount},已禁用任务数: {$disabledCount}");
if (empty($tasksToRun)) {
$output->writeln('<info>当前时间没有需要执行的任务</info>');
return true;
@@ -266,9 +299,36 @@ class TaskSchedulerCommand extends Command
// 检查任务是否已经在运行(防止重复执行)
$lockKey = "scheduler_task_lock:{$taskId}";
$lockTime = Cache::get($lockKey);
if ($lockTime && (time() - $lockTime) < 300) { // 5分钟内不重复执行
$output->writeln("<comment>任务 {$taskId} 正在运行中,跳过</comment>");
continue;
// 如果锁存在,检查进程是否真的在运行
if ($lockTime) {
$lockPid = Cache::get("scheduler_task_pid:{$taskId}");
if ($lockPid) {
// 检查进程是否真的在运行
if (function_exists('posix_kill')) {
// 使用 posix_kill(pid, 0) 检查进程是否存在0信号不杀死进程只检查
if (@posix_kill($lockPid, 0)) {
$output->writeln("<comment>任务 {$taskId} 正在运行中PID: {$lockPid}),跳过</comment>");
continue;
} else {
// 进程不存在,清除锁
Cache::rm($lockKey);
Cache::rm("scheduler_task_pid:{$taskId}");
}
} else {
// 如果没有 posix_kill使用时间判断2分钟内不重复执行
if ((time() - $lockTime) < 120) {
$output->writeln("<comment>任务 {$taskId} 可能在运行中2分钟内执行过跳过</comment>");
continue;
}
}
} else {
// 如果没有PID记录使用时间判断2分钟内不重复执行
if ((time() - $lockTime) < 120) {
$output->writeln("<comment>任务 {$taskId} 可能在运行中2分钟内执行过跳过</comment>");
continue;
}
}
}
// 创建子进程
@@ -291,8 +351,9 @@ class TaskSchedulerCommand extends Command
];
$output->writeln("<info>启动任务:{$taskId} (PID: {$pid})</info>");
// 设置任务锁
// 设置任务锁和PID
Cache::set($lockKey, time(), 600); // 10分钟过期
Cache::set("scheduler_task_pid:{$taskId}", $pid, 600); // 保存PID10分钟过期
}
}
@@ -337,37 +398,51 @@ class TaskSchedulerCommand extends Command
}
// 构建命令
// 使用项目根目录下的 think 脚本(同命令行 php think
if (!defined('ROOT_PATH')) {
define('ROOT_PATH', dirname(__DIR__, 2));
// 使用指定的网站目录作为执行目录
$executionPath = '/www/wwwroot/mckb_quwanzhi_com/Server';
// 获取 PHP 可执行文件路径
$phpPath = PHP_BINARY ?: 'php';
// 获取 think 脚本路径(使用执行目录)
$thinkPath = $executionPath . DIRECTORY_SEPARATOR . 'think';
// 检查 think 文件是否存在
if (!is_file($thinkPath)) {
$errorMsg = "错误think 文件不存在:{$thinkPath}";
Log::error($errorMsg);
file_put_contents($logFile, $errorMsg . "\n", FILE_APPEND);
return;
}
$thinkPath = ROOT_PATH . DIRECTORY_SEPARATOR . 'think';
$command = "php {$thinkPath} {$task['command']}";
// 构建命令(使用绝对路径,确保在 Linux 上能正确执行)
$command = escapeshellarg($phpPath) . ' ' . escapeshellarg($thinkPath) . ' ' . escapeshellarg($task['command']);
if (!empty($task['options'])) {
foreach ($task['options'] as $option) {
$command .= ' ' . escapeshellarg($option);
}
}
// 添加日志重定向
// 添加日志重定向(在后台执行)
$command .= " >> " . escapeshellarg($logFile) . " 2>&1";
// 记录任务开始
$logMessage = "\n" . str_repeat('=', 60) . "\n";
$logMessage .= "任务开始执行: {$taskId}\n";
$logMessage .= "执行时间: " . date('Y-m-d H:i:s') . "\n";
$logMessage .= "执行目录: {$executionPath}\n";
$logMessage .= "命令: {$command}\n";
$logMessage .= str_repeat('=', 60) . "\n";
file_put_contents($logFile, $logMessage, FILE_APPEND);
// 执行命令
// 执行命令使用指定的执行目录Linux 环境)
$descriptorspec = [
0 => ['file', (PHP_OS_FAMILY === 'Windows' ? 'NUL' : '/dev/null'), 'r'], // stdin
0 => ['file', '/dev/null', 'r'], // stdin
1 => ['file', $logFile, 'a'], // stdout
2 => ['file', $logFile, 'a'], // stderr
];
$process = @proc_open($command, $descriptorspec, $pipes, ROOT_PATH);
$process = @proc_open($command, $descriptorspec, $pipes, $executionPath);
if (is_resource($process)) {
// 关闭管道
@@ -412,12 +487,8 @@ class TaskSchedulerCommand extends Command
// 关闭进程
proc_close($process);
} else {
// 如果 proc_open 失败,尝试直接执行(后台执行
if (PHP_OS_FAMILY === 'Windows') {
pclose(popen("start /B " . $command, "r"));
} else {
exec($command . ' > /dev/null 2>&1 &');
}
// 如果 proc_open 失败,使用 exec 在后台执行Linux 环境
exec("cd " . escapeshellarg($executionPath) . " && " . $command . ' > /dev/null 2>&1 &');
}
$endTime = microtime(true);
@@ -448,12 +519,17 @@ class TaskSchedulerCommand extends Command
if ($result == $pid || $result == -1) {
// 进程已结束
$taskId = $info['task_id'];
unset($this->runningProcesses[$pid]);
// 清除任务锁和PID
Cache::rm("scheduler_task_lock:{$taskId}");
Cache::rm("scheduler_task_pid:{$taskId}");
$duration = time() - $info['start_time'];
Log::info("子进程执行完成", [
'pid' => $pid,
'task' => $info['task_id'],
'task' => $taskId,
'duration' => $duration,
]);
}

View File

@@ -3,7 +3,6 @@
namespace app\common\service;
use app\api\controller\AutomaticAssign;
use app\api\controller\AccountController;
use think\Db;
use think\facade\Log;
@@ -44,20 +43,12 @@ class FriendTransferService
}
// 获取同部门的在线账号列表
$accountController = new AccountController();
$accountController->getlist([
'pageIndex' => 0,
'pageSize' => 100,
'departmentId' => $accountData['departmentId']
]);
$accountIds = Db::table('s2_company_account')
->where([
'departmentId' => $accountData['departmentId'],
'alive' => 1
])
->column('id');
if (empty($accountIds)) {
return [
'success' => false,
@@ -157,82 +148,321 @@ class FriendTransferService
}
}
/**
* 批量迁移好友到其他账号(按账号分组处理)
* @param array $friends 好友列表,格式:[['friendId' => int, 'accountId' => int], ...]
* @param int $currentAccountId 当前账号ID
* @param string $reason 迁移原因
* @return array ['transferred' => int, 'failed' => int]
*/
public function transferFriendsBatch($friends, $currentAccountId, $reason = '')
{
$transferred = 0;
$failed = 0;
if (empty($friends)) {
return ['transferred' => 0, 'failed' => 0];
}
try {
// 获取当前账号的部门信息
$accountData = Db::table('s2_company_account')->where('id', $currentAccountId)->find();
if (empty($accountData)) {
Log::error("批量迁移失败当前账号不存在账号ID={$currentAccountId}");
return ['transferred' => 0, 'failed' => count($friends)];
}
// 获取同部门的在线账号列表
$accountIds = Db::table('s2_company_account')
->where([
'departmentId' => $accountData['departmentId'],
'alive' => 1
])
->column('id');
if (empty($accountIds)) {
Log::warning("批量迁移失败没有可用的在线账号账号ID={$currentAccountId}");
return ['transferred' => 0, 'failed' => count($friends)];
}
// 排除当前账号,选择其他账号
$availableAccountIds = array_filter($accountIds, function($id) use ($currentAccountId) {
return $id != $currentAccountId;
});
if (empty($availableAccountIds)) {
Log::warning("批量迁移失败没有其他可用的在线账号账号ID={$currentAccountId}");
return ['transferred' => 0, 'failed' => count($friends)];
}
// 随机选择一个目标账号(同一批次使用同一个目标账号)
$availableAccountIds = array_values($availableAccountIds);
$randomKey = array_rand($availableAccountIds, 1);
$toAccountId = $availableAccountIds[$randomKey];
// 获取目标账号信息
$toAccountData = Db::table('s2_company_account')->where('id', $toAccountId)->find();
if (empty($toAccountData)) {
Log::error("批量迁移失败目标账号不存在账号ID={$toAccountId}");
return ['transferred' => 0, 'failed' => count($friends)];
}
// 批量获取好友信息
$friendIds = array_column($friends, 'friendId');
$friendList = Db::table('s2_wechat_friend')
->where('id', 'in', $friendIds)
->select();
$friendMap = [];
foreach ($friendList as $friend) {
$friendMap[$friend['id']] = $friend;
}
// 批量执行迁移
$automaticAssign = new AutomaticAssign();
$updateData = [];
foreach ($friends as $friendItem) {
$wechatFriendId = $friendItem['friendId'];
if (!isset($friendMap[$wechatFriendId])) {
$failed++;
Log::warning("批量迁移失败好友不存在好友ID={$wechatFriendId}");
continue;
}
$friend = $friendMap[$wechatFriendId];
// 如果好友当前账号不在可用账号列表中,或者需要迁移到其他账号
$needTransfer = !in_array($friend['accountId'], $accountIds) || $currentAccountId != $friend['accountId'];
if ($needTransfer) {
// 执行迁移
$result = $automaticAssign->allotWechatFriend([
'wechatFriendId' => $wechatFriendId,
'toAccountId' => $toAccountId
], true);
$resultData = json_decode($result, true);
if (isset($resultData['code']) && $resultData['code'] == 200) {
// 收集需要更新的数据
$updateData[] = [
'id' => $wechatFriendId,
'accountId' => $toAccountId,
'accountUserName' => $toAccountData['userName'],
'accountRealName' => $toAccountData['realName'],
'accountNickname' => $toAccountData['nickname'],
];
$transferred++;
} else {
$errorMsg = isset($resultData['msg']) ? $resultData['msg'] : '迁移失败';
$failed++;
Log::warning("批量迁移失败好友ID={$wechatFriendId},错误:{$errorMsg}");
}
} else {
// 无需迁移
$transferred++;
}
}
// 批量更新好友的账号信息
if (!empty($updateData)) {
foreach ($updateData as $data) {
Db::table('s2_wechat_friend')
->where('id', $data['id'])
->update([
'accountId' => $data['accountId'],
'accountUserName' => $data['accountUserName'],
'accountRealName' => $data['accountRealName'],
'accountNickname' => $data['accountNickname'],
]);
}
$logMessage = "批量迁移成功账号ID={$currentAccountId},共" . count($updateData) . "个好友迁移到账号{$toAccountId}";
if (!empty($reason)) {
$logMessage .= ",原因:{$reason}";
}
Log::info($logMessage);
}
return [
'transferred' => $transferred,
'failed' => $failed
];
} catch (\Exception $e) {
Log::error("批量迁移异常账号ID={$currentAccountId},错误:" . $e->getMessage());
return [
'transferred' => $transferred,
'failed' => count($friends) - $transferred
];
}
}
/**
* 检查并迁移未读或未回复的好友
* @param int $unreadMinutes 未读分钟数默认30分钟
* @param int $pageSize 每页处理数量默认100
* @return array ['total' => int, 'transferred' => int, 'failed' => int]
*/
public function checkAndTransferUnreadOrUnrepliedFriends($unreadMinutes = 30)
public function checkAndTransferUnreadOrUnrepliedFriends($unreadMinutes = 30, $pageSize = 100)
{
$total = 0;
$transferred = 0;
$failed = 0;
try {
$timeThreshold = time() - ($unreadMinutes * 60);
$currentTime = time();
$timeThreshold = $currentTime - ($unreadMinutes * 60); // 超过指定分钟数的时间点
$last24Hours = $currentTime - (24 * 60 * 60); // 近24小时的时间点
// 确保每页数量合理
$pageSize = max(1, min(1000, intval($pageSize)));
// 查询需要迁移的好友
// 条件:最后一条消息是用户发送的消息isSend=0且超过指定分钟数且客服在这之后没有回复
// 条件:以消息表为主表查询近24小时内的消息
// 1. 最后一条消息是用户发送的消息isSend=0
// 2. 消息时间在近24小时内
// 3. 消息时间超过指定分钟数默认30分钟
// 4. 在这条用户消息之后,客服没有发送任何回复
// 即用户发送了消息但客服超过30分钟没有回复需要迁移给其他客服处理
// 使用子查询找到每个好友的最后一条消息
// SQL逻辑说明
// 1. 找到每个好友的最后一条消息通过MAX(id)
// 2. 最后一条消息必须是用户发送的isSend=0即客服接收的消息
// 3. 这条消息的时间超过30分钟前wm.wechatTime <= timeThreshold
// SQL逻辑说明以消息表为主表
// 1. 从消息表开始筛选近24小时内的用户消息isSend=0
// 2. 找到每个好友的最后一条用户消息通过MAX(id)
// 3. 这条消息的时间超过指定分钟数wm.wechatTime <= timeThreshold
// 4. 在这条用户消息之后客服没有发送任何回复NOT EXISTS isSend=1的消息
// 5. 满足以上条件的好友说明客服超过30分钟未回复需要迁移给其他客服
$sql = "
SELECT DISTINCT
wf.id as friendId,
wf.accountId,
wm.wechatAccountId,
wm.wechatTime,
wm.id as lastMessageId
FROM s2_wechat_friend wf
// 5. 关联好友表,确保好友未删除且已分配账号
// 先统计总数
$countSql = "
SELECT COUNT(DISTINCT wm.wechatFriendId) as total
FROM s2_wechat_message wm
INNER JOIN (
SELECT wechatFriendId, MAX(id) as maxId
FROM s2_wechat_message
WHERE type = 1
AND isSend = 0 -- 用户发送的消息
AND wechatTime >= ? -- 近24小时内的消息
GROUP BY wechatFriendId
) last_msg ON wf.id = last_msg.wechatFriendId
INNER JOIN s2_wechat_message wm ON wm.id = last_msg.maxId
WHERE wf.isDeleted = 0
AND wm.type = 1
) last_msg ON wm.id = last_msg.maxId
INNER JOIN s2_wechat_friend wf ON wf.id = wm.wechatFriendId
WHERE wm.type = 1
AND wm.isSend = 0 -- 最后一条消息是用户发送的(客服接收的)
AND wm.wechatTime >= ? -- 近24小时内的消息
AND wm.wechatTime <= ? -- 超过指定时间默认30分钟
AND wf.isDeleted = 0
AND wf.accountId IS NOT NULL
AND NOT EXISTS (
-- 检查在这条用户消息之后,是否有客服的回复
SELECT 1
FROM s2_wechat_message
WHERE wechatFriendId = wf.id
WHERE wechatFriendId = wm.wechatFriendId
AND type = 1
AND isSend = 1 -- 客服发送的消息
AND wechatTime > wm.wechatTime -- 在用户消息之后
)
AND wf.accountId IS NOT NULL
";
$friends = Db::query($sql, [$timeThreshold]);
$total = count($friends);
$countResult = Db::query($countSql, [$last24Hours, $last24Hours, $timeThreshold]);
$total = isset($countResult[0]['total']) ? intval($countResult[0]['total']) : 0;
Log::info("开始检查未读/未回复好友,共找到 {$total} 个需要迁移的好友");
foreach ($friends as $friend) {
$result = $this->transferFriend(
$friend['friendId'],
$friend['accountId'],
"消息未读或未回复超过{$unreadMinutes}分钟"
);
if ($result['success']) {
$transferred++;
} else {
$failed++;
Log::warning("好友迁移失败好友ID={$friend['friendId']},原因:{$result['message']}");
}
if ($total == 0) {
Log::info("未找到需要迁移的未读/未回复好友近24小时内");
return [
'total' => 0,
'transferred' => 0,
'failed' => 0
];
}
Log::info("开始检查未读/未回复好友近24小时内共找到 {$total} 个需要迁移的好友,将分页处理(每页{$pageSize}条)");
// 分页处理
$page = 1;
$processed = 0;
do {
$offset = ($page - 1) * $pageSize;
$sql = "
SELECT DISTINCT
wf.id as friendId,
wf.accountId,
wm.wechatAccountId,
wm.wechatTime,
wm.id as lastMessageId
FROM s2_wechat_message wm
INNER JOIN (
SELECT wechatFriendId, MAX(id) as maxId
FROM s2_wechat_message
WHERE type = 1
AND isSend = 0 -- 用户发送的消息
AND wechatTime >= ? -- 近24小时内的消息
GROUP BY wechatFriendId
) last_msg ON wm.id = last_msg.maxId
INNER JOIN s2_wechat_friend wf ON wf.id = wm.wechatFriendId
WHERE wm.type = 1
AND wm.isSend = 0 -- 最后一条消息是用户发送的(客服接收的)
AND wm.wechatTime >= ? -- 近24小时内的消息
AND wm.wechatTime <= ? -- 超过指定时间默认30分钟
AND wf.isDeleted = 0
AND wf.accountId IS NOT NULL
AND NOT EXISTS (
-- 检查在这条用户消息之后,是否有客服的回复
SELECT 1
FROM s2_wechat_message
WHERE wechatFriendId = wm.wechatFriendId
AND type = 1
AND isSend = 1 -- 客服发送的消息
AND wechatTime > wm.wechatTime -- 在用户消息之后
)
ORDER BY wf.accountId ASC, wm.id ASC
LIMIT ? OFFSET ?
";
$friends = Db::query($sql, [$last24Hours, $last24Hours, $timeThreshold, $pageSize, $offset]);
$currentPageCount = count($friends);
if ($currentPageCount == 0) {
break;
}
Log::info("处理第 {$page} 页,本页 {$currentPageCount} 条记录");
// 按 accountId 分组
$friendsByAccount = [];
foreach ($friends as $friend) {
$accountId = $friend['accountId'];
if (!isset($friendsByAccount[$accountId])) {
$friendsByAccount[$accountId] = [];
}
$friendsByAccount[$accountId][] = $friend;
}
// 按账号分组批量处理
foreach ($friendsByAccount as $accountId => $accountFriends) {
$batchResult = $this->transferFriendsBatch(
$accountFriends,
$accountId,
"消息未读或未回复超过{$unreadMinutes}分钟"
);
$transferred += $batchResult['transferred'];
$failed += $batchResult['failed'];
$processed += count($accountFriends);
Log::info("账号 {$accountId} 批量迁移完成:成功{$batchResult['transferred']},失败{$batchResult['failed']},共" . count($accountFriends) . "个好友");
}
$page++;
// 每处理一页后记录进度
Log::info("已处理 {$processed}/{$total} 条记录,成功:{$transferred},失败:{$failed}");
} while ($currentPageCount == $pageSize && $processed < $total);
Log::info("未读/未回复好友迁移完成:总计{$total},成功{$transferred},失败{$failed}");
return [