Files
cunkebao_v3/Server/application/job/WorkbenchGroupPushJob.php
2025-08-07 11:56:04 +08:00

401 lines
14 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupPush;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
/**
* 工作台消息群发任务
* Class WorkbenchGroupPushJob
* @package app\job
*/
class WorkbenchGroupPushJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
// 获取所有工作台
$workbenches = Workbench::where(['status' => 1, 'type' => 3, 'isDel' => 0])->order('id desc')->select();
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupPush::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
//判断是否推送
$isPush = $this->isPush($workbench, $config);
if (empty($isPush)) {
continue;
}
// 获取内容库
$contentLibrary = $this->getContentLibrary($workbench, $config);
if (empty($contentLibrary)) {
continue;
}
// 处理内容发送
$this->sendMsgToGroup($workbench, $config, $contentLibrary);
}
} catch (\Exception $e) {
Log::error("消息群发任务异常: " . $e->getMessage());
throw $e;
}
}
// 发微信个人消息
public function sendMsgToGroup($workbench, $config, $msgConf)
{
// 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
// 当前type 为文本、图片、动图表情包的时候content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''}
// $result = [
// "content" => $dataArray['content'],
// "msgSubType" => 0,
// "msgType" => $dataArray['msgType'],
// "seq" => time(),
// "wechatAccountId" => $dataArray['wechatAccountId'],
// "wechatChatroomId" => 0,
// "wechatFriendId" => $dataArray['wechatFriendId'],
// ];
$groups = json_decode($config['groups'], true);
$groupsData = Db::name('wechat_group')->whereIn('id', $groups)->field('id,wechatAccountId,chatroomId,companyId,ownerWechatId')->select();
if (empty($groupsData)) {
return false;
}
$toAccountId = '';
$username = Env::get('api.username', '');
$password = Env::get('api.password', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
// 建立WebSocket
$wsController = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
foreach ($msgConf as $content) {
$sendData = [];
$sqlData = [];
foreach ($groupsData as $groups) {
// msgType(1:文本 3:图片 43:视频 47:动图表情包gif、其他表情包 49:小程序/其他:图文、文件)
$sqlData[] = [
'workbenchId' => $workbench['id'],
'contentId' => $content['id'],
'groupId' => $groups['id'],
'wechatAccountId' => $groups['wechatAccountId'],
'createTime' => time()
];
//内容
if (!empty($content['content'])) {
$sendData[] = [
'content' => $content['content'],
'msgType' => 1,
'wechatAccountId' => $groups['wechatAccountId'],
'wechatChatroomId' => $groups['id'],
];
}
switch ($content['contentType']) {
case 1:
//图片解析
$imgs = json_decode($content['resUrls'], true);
if (!empty($imgs)) {
foreach ($imgs as $img) {
$sendData[] = [
'content' => $img,
'msgType' => 3,
'wechatAccountId' => $groups['wechatAccountId'],
'wechatChatroomId' => $groups['id'],
];
}
}
break;
case 2:
//链接解析
$url = json_decode($content['urls'], true);
if (!empty($url[0])) {
$url = $url[0];
$sendData[] = [
'content' => [
'desc' => '',
'thumbPath' => $url['image'],
'title' => $url['desc'],
'type' => 'link',
'url' => $url['url'],
],
'msgType' => 49,
'wechatAccountId' => $groups['wechatAccountId'],
'wechatChatroomId' => $groups['id'],
];
}
break;
case 3:
//视频解析
$video = json_decode($content['urls'], true);
if (!empty($video)) {
$video = $video[0];
}
$sendData[] = [
'content' => $video,
'msgType' => 43,
'wechatAccountId' => $groups['wechatAccountId'],
'wechatChatroomId' => $groups['id'],
];
break;
}
if (empty($sendData)) {
continue;
}
//发送消息
foreach ($sendData as $send) {
$wsController->sendCommunity($send);
}
//插入发送记录
Db::name('workbench_group_push_item')->insertAll($sqlData);
}
}
}
/**
* 记录发送历史
* @param Workbench $workbench
* @param array $devices
* @param array $contentLibrary
*/
protected function recordSendHistory($workbench, $devices, $contentLibrary)
{
$now = time();
$data = [];
foreach ($devices as $device) {
$data = [
'workbenchId' => $workbench->id,
'deviceId' => $device['deviceId'],
'contentId' => $contentLibrary['id'],
'wechatAccountId' => $device['wechatAccountId'],
'createTime' => $now,
];
Db::name('workbench_group_push_item')->insert($data);
}
}
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isPush($workbench, $config)
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if (($startTimestamp > time() || $endTimestamp < time()) && empty($config['pushType'])) {
return false;
}
$totalSeconds = $endTimestamp - $startTimestamp;
if ($totalSeconds <= 0 || empty($config['maxPerDay'])) {
return false;
}
$interval = floor($totalSeconds / $config['maxPerDay']);
// 查询今日已同步次数
$count = Db::name('workbench_group_push_item')
->where('workbenchId', $workbench->id)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxPerDay']) {
return false;
}
// 计算本次同步的最早允许时间
$nextSyncTime = $startTimestamp + $count * $interval;
if (time() < $nextSyncTime) {
return false;
}
return true;
}
/**
* 获取内容库
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function getContentLibrary($workbench, $config)
{
$contentids = json_decode($config['contentLibraries'], true);
if (empty($contentids)) {
return false;
}
if ($config['pushType'] == 1) {
$limit = 10;
} else {
$limit = 1;
}
//推送顺序
if ($config['pushOrder'] == 1) {
$order = 'ci.sendTime desc, ci.id asc';
} else {
$order = 'ci.sendTime desc, ci.id desc';
}
// 基础查询
$query = Db::name('content_library')->alias('cl')
->join('content_item ci', 'ci.libraryId = cl.id')
->join('workbench_group_push_item wgpi', 'wgpi.contentId = ci.id and wgpi.workbenchId = ' . $workbench->id, 'left')
->where(['cl.isDel' => 0, 'ci.isDel' => 0])
->where('ci.sendTime <= ' . (time() + 60))
->whereIn('cl.id', $contentids)
->field([
'ci.id',
'ci.libraryId',
'ci.contentType',
'ci.title',
'ci.content',
'ci.resUrls',
'ci.urls',
'ci.comment',
'ci.sendTime'
]);
// 复制 query
$query2 = clone $query;
$query3 = clone $query;
// 根据accountType处理不同的发送逻辑
if ($config['isLoop'] == 1) {
// 可以循环发送
// 1. 优先获取未发送的内容
$unsentContent = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
if (!empty($unsentContent)) {
return $unsentContent;
}
$lastSendData = Db::name('workbench_group_push_item')->where('workbenchId', $workbench->id)->order('id desc')->find();
$fastSendData = Db::name('workbench_group_push_item')->where('workbenchId', $workbench->id)->order('id asc')->find();
$sentContent = $query2->where('wgpi.contentId', '<', $lastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
if (empty($sentContent)) {
$sentContent = $query3->where('wgpi.contentId', '=', $fastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select();
}
return $sentContent;
} else {
// 不能循环发送,只获取未发送的内容
$list = $query->where('wgpi.id', 'null')
->order($order)
->limit(0, $limit)
->select();
return $list;
}
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
}