Files
cunkebao_v3/Server/application/job/WorkbenchGroupPushJob.php
2025-11-07 15:25:50 +08:00

653 lines
23 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupPush;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
use app\cunkebao\controller\WorkbenchController;
/**
* 工作台消息群发任务
* Class WorkbenchGroupPushJob
* @package app\job
*/
class WorkbenchGroupPushJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
// 获取所有工作台
$workbenches = Workbench::where(['status' => 1, 'type' => 3, 'isDel' => 0,'id' => 256])->order('id desc')->select();
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupPush::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
//判断是否推送
$isPush = $this->isPush($workbench, $config);
if (empty($isPush)) {
continue;
}
// 获取内容库
$contentLibrary = $this->getContentLibrary($workbench, $config);
if (empty($contentLibrary)) {
continue;
}
// 处理内容发送
$this->sendMsgToGroup($workbench, $config, $contentLibrary);
}
} catch (\Exception $e) {
Log::error("消息群发任务异常: " . $e->getMessage());
throw $e;
}
}
// 发送消息(支持群推送和好友推送)
public function sendMsgToGroup($workbench, $config, $msgConf)
{
// 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
// 当前type 为文本、图片、动图表情包的时候content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''}
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
$toAccountId = '';
$username = Env::get('api.username', '');
$password = Env::get('api.password', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
// 建立WebSocket
$wsController = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
if ($targetType == 1) {
// 群推送
$this->sendToGroups($workbench, $config, $msgConf, $wsController);
} else {
// 好友推送
$this->sendToFriends($workbench, $config, $msgConf, $wsController);
}
}
/**
* 发送群消息
*/
protected function sendToGroups($workbench, $config, $msgConf, $wsController)
{
$groups = json_decode($config['groups'], true);
if (empty($groups)) {
return false;
}
$groupsData = Db::name('wechat_group')->whereIn('id', $groups)->field('id,wechatAccountId,chatroomId,companyId,ownerWechatId')->select();
if (empty($groupsData)) {
return false;
}
foreach ($msgConf as $content) {
$sendData = [];
$sqlData = [];
foreach ($groupsData as $group) {
// msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
$sqlData[] = [
'workbenchId' => $workbench['id'],
'contentId' => $content['id'],
'groupId' => $group['id'],
'friendId' => null,
'targetType' => 1,
'wechatAccountId' => $group['wechatAccountId'],
'createTime' => time()
];
// 构建发送数据
$sendData = $this->buildSendData($content, $config, $group['wechatAccountId'], $group['id'], 'group');
if (empty($sendData)) {
continue;
}
//发送消息
foreach ($sendData as $send) {
$wsController->sendCommunity($send);
}
//插入发送记录
Db::name('workbench_group_push_item')->insertAll($sqlData);
}
}
}
/**
* 发送好友消息
*/
protected function sendToFriends($workbench, $config, $msgConf, $wsController)
{
$friends = json_decode($config['friends'], true);
$devices = json_decode($config['devices'] ?? '[]', true);
// 如果好友列表为空,则根据设备查询所有好友
if (empty($friends)) {
if (empty($devices)) {
// 如果没有选择设备,则无法推送
Log::warning('好友推送:未选择设备,无法推送全部好友');
return false;
}
// 根据设备查询所有好友
$friendsData = Db::table('s2_company_account')
->alias('ca')
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
->join(['s2_wechat_friend' => 'wf'], 'wf.wechatAccountId = wa.id')
->where([
'ca.status' => 0,
'wf.isDeleted' => 0,
'wa.deviceAlive' => 1,
'wa.wechatAlive' => 1
])
->whereIn('wa.currentDeviceId', $devices)
->field('wf.id,wf.wechatAccountId,wf.wechatId,wf.ownerWechatId')
->group('wf.id')
->select();
} else {
// 查询指定的好友信息
$friendsData = Db::table('s2_wechat_friend')
->whereIn('id', $friends)
->where('isDeleted', 0)
->field('id,wechatAccountId,wechatId,ownerWechatId')
->select();
}
if (empty($friendsData)) {
return false;
}
// 获取所有已推送的好友ID列表去重不限制时间范围用于过滤避免重复推送
$sentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->column('friendId');
$sentFriendIds = array_filter($sentFriendIds); // 过滤null值
$sentFriendIds = array_unique($sentFriendIds); // 去重
// 获取今日已推送的好友ID列表用于计算今日推送人数
$today = date('Y-m-d');
$todayStartTimestamp = strtotime($today . ' 00:00:00');
$todayEndTimestamp = strtotime($today . ' 23:59:59');
$todaySentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->whereTime('createTime', 'between', [$todayStartTimestamp, $todayEndTimestamp])
->column('friendId');
$todaySentFriendIds = array_filter($todaySentFriendIds); // 过滤null值
$todaySentFriendIds = array_unique($todaySentFriendIds); // 去重
// 过滤掉所有已推送的好友(不限制时间范围,避免重复推送)
$friendsData = array_filter($friendsData, function($friend) use ($sentFriendIds) {
return !in_array($friend['id'], $sentFriendIds);
});
if (empty($friendsData)) {
Log::info('好友推送:所有好友都已推送过');
return false;
}
// 重新索引数组
$friendsData = array_values($friendsData);
// 计算剩余可推送人数(基于今日推送人数)
$todaySentCount = count($todaySentFriendIds);
$maxPerDay = intval($config['maxPerDay']);
$remainingCount = $maxPerDay - $todaySentCount;
if ($remainingCount <= 0) {
Log::info('好友推送:今日推送人数已达上限');
return false;
}
// 限制本次推送人数(不超过剩余可推送人数)
$friendsData = array_slice($friendsData, 0, $remainingCount);
// 批量处理每批最多500人
$batchSize = 500;
$batches = array_chunk($friendsData, $batchSize);
foreach ($msgConf as $content) {
foreach ($batches as $batchIndex => $batch) {
$sqlData = [];
foreach ($batch as $friend) {
// 构建发送数据
$sendData = $this->buildSendData($content, $config, $friend['wechatAccountId'], $friend['id'], 'friend');
if (empty($sendData)) {
continue;
}
// 发送个人消息
foreach ($sendData as $send) {
if ($send['msgType'] == 49){
$sendContent = json_encode($send['content'], 256);
} else {
$sendContent = $send['content'];
}
$wsController->sendPersonal([
'wechatFriendId' => $friend['id'],
'wechatAccountId' => $friend['wechatAccountId'],
'msgType' => $send['msgType'],
'content' => $sendContent,
]);
}
// 准备插入发送记录
$sqlData[] = [
'workbenchId' => $workbench['id'],
'contentId' => $content['id'],
'groupId' => null,
'friendId' => $friend['id'],
'targetType' => 2,
'wechatAccountId' => $friend['wechatAccountId'],
'createTime' => time()
];
}
// 批量插入发送记录
if (!empty($sqlData)) {
Db::name('workbench_group_push_item')->insertAll($sqlData);
Log::info("好友推送:第" . ($batchIndex + 1) . "批,推送了" . count($sqlData) . "个好友");
}
// 如果不是最后一批,等待一下再处理下一批(避免一次性推送太多)
if ($batchIndex < count($batches) - 1) {
sleep(1); // 等待1秒
}
}
}
}
/**
* 构建发送数据
*/
protected function buildSendData($content, $config, $wechatAccountId, $targetId, $type = 'group')
{
$sendData = [];
// 内容处理
if (!empty($content['content'])) {
// 京东转链
if (!empty($config['promotionSiteId'])) {
$WorkbenchController = new WorkbenchController();
$jdLink = $WorkbenchController->changeLink($content['content'], $config['promotionSiteId']);
$jdLink = json_decode($jdLink, true);
if ($jdLink['code'] == 200) {
$content['content'] = $jdLink['data'];
}
}
if ($type == 'group') {
$sendData[] = [
'content' => $content['content'],
'msgType' => 1,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $content['content'],
'msgType' => 1,
];
}
}
// 根据内容类型处理
switch ($content['contentType']) {
case 1:
// 图片解析
$imgs = json_decode($content['resUrls'], true);
if (!empty($imgs)) {
foreach ($imgs as $img) {
if ($type == 'group') {
$sendData[] = [
'content' => $img,
'msgType' => 3,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $img,
'msgType' => 3,
];
}
}
}
break;
case 2:
// 链接解析
$url = json_decode($content['urls'], true);
if (!empty($url[0])) {
$url = $url[0];
$linkContent = [
'desc' => $url['desc'],
'thumbPath' => $url['image'],
'title' => $url['desc'],
'type' => 'link',
'url' => $url['url'],
];
if ($type == 'group') {
$sendData[] = [
'content' => $linkContent,
'msgType' => 49,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $linkContent,
'msgType' => 49,
];
}
}
break;
case 3:
// 视频解析
$video = json_decode($content['resUrls'], true);
if (!empty($video)) {
$video = $video[0];
}
if ($type == 'group') {
$sendData[] = [
'content' => $video,
'msgType' => 43,
'wechatAccountId' => $wechatAccountId,
'wechatChatroomId' => $targetId,
];
} else {
$sendData[] = [
'content' => $video,
'msgType' => 43,
];
}
break;
}
return $sendData;
}
/**
* 记录发送历史
* @param Workbench $workbench
* @param array $devices
* @param array $contentLibrary
*/
protected function recordSendHistory($workbench, $devices, $contentLibrary)
{
$now = time();
$data = [];
foreach ($devices as $device) {
$data = [
'workbenchId' => $workbench->id,
'deviceId' => $device['deviceId'],
'contentId' => $contentLibrary['id'],
'wechatAccountId' => $device['wechatAccountId'],
'createTime' => $now,
];
Db::name('workbench_group_push_item')->insert($data);
}
}
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isPush($workbench, $config)
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if (($startTimestamp > time() || $endTimestamp < time()) && empty($config['pushType'])) {
return false;
}
$totalSeconds = $endTimestamp - $startTimestamp;
if ($totalSeconds <= 0 || empty($config['maxPerDay'])) {
return false;
}
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
if ($targetType == 2) {
// 好友推送maxPerDay表示每日推送人数
// 查询今日已推送的好友ID列表去重仅统计今日
$sentFriendIds = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 2)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->column('friendId');
$sentFriendIds = array_filter($sentFriendIds); // 过滤null值
$count = count(array_unique($sentFriendIds)); // 去重后统计今日推送人数
if ($count >= $config['maxPerDay']) {
return false;
}
// 计算本次同步的最早允许时间(按人数计算间隔)
$interval = floor($totalSeconds / $config['maxPerDay']);
$nextSyncTime = $startTimestamp + $count * $interval;
if (time() < $nextSyncTime) {
return false;
}
} else {
// 群推送maxPerDay表示每日推送次数
$interval = floor($totalSeconds / $config['maxPerDay']);
// 查询今日已同步次数
$count = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', 1)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxPerDay']) {
return false;
}
// 计算本次同步的最早允许时间
$nextSyncTime = $startTimestamp + $count * $interval;
if (time() < $nextSyncTime) {
return false;
}
}
return true;
}
/**
* 获取内容库
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function getContentLibrary($workbench, $config)
{
$contentids = json_decode($config['contentLibraries'], true);
if (empty($contentids)) {
return false;
}
$targetType = isset($config['targetType']) ? intval($config['targetType']) : 1; // 默认1=群推送
if ($config['pushType'] == 1) {
$limit = 10;
} else {
$limit = 1;
}
//推送顺序
if ($config['pushOrder'] == 1) {
$order = 'ci.sendTime desc, ci.id asc';
} else {
$order = 'ci.sendTime desc, ci.id desc';
}
// 基础查询根据targetType过滤记录
$query = Db::name('content_library')->alias('cl')
->join('content_item ci', 'ci.libraryId = cl.id')
->join('workbench_group_push_item wgpi', 'wgpi.contentId = ci.id and wgpi.workbenchId = ' . $workbench->id . ' and wgpi.targetType = ' . $targetType, 'left')
->where(['cl.isDel' => 0, 'ci.isDel' => 0])
->where('ci.sendTime <= ' . (time() + 60))
->whereIn('cl.id', $contentids)
->field([
'ci.id',
'ci.libraryId',
'ci.contentType',
'ci.title',
'ci.content',
'ci.resUrls',
'ci.urls',
'ci.comment',
'ci.sendTime'
]);
// 复制 query
$query2 = clone $query;
$query3 = clone $query;
// 根据isLoop处理不同的发送逻辑
if ($config['isLoop'] == 1) {
// 可以循环发送只有群推送时才能为1
// 1. 优先获取未发送的内容
$unsentContent = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
if (!empty($unsentContent)) {
return $unsentContent;
}
$lastSendData = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', $targetType)
->order('id desc')
->find();
$fastSendData = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->where('targetType', $targetType)
->order('id asc')
->find();
if (empty($lastSendData) || empty($fastSendData)) {
return [];
}
$sentContent = $query2->where('wgpi.contentId', '<', $lastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
if (empty($sentContent)) {
$sentContent = $query3->where('wgpi.contentId', '=', $fastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
}
return $sentContent;
} else {
// 不能循环发送只获取未发送的内容好友推送时isLoop=0
$list = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
return $list;
}
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
}