Files
cunkebao_v3/Server/application/job/WorkbenchGroupCreateJob.php
2025-09-16 09:57:06 +08:00

300 lines
12 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupCreate;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
use app\api\controller\WechatController;
/**
* 工作台群创建任务
* Class WorkbenchGroupCreateJob
* @package app\job
*/
class WorkbenchGroupCreateJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
// 获取所有工作台
$workbenches = Workbench::where(['status' => 1, 'type' => 4, 'isDel' => 0])->order('id desc')->select();
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
$config['poolGroups'] = json_decode($config['poolGroups'], true);
$config['devices'] = json_decode($config['devices'], true);
if (empty($config['poolGroups']) || empty($config['devices'])) {
continue;
}
//群主及内部成员
$groupMember = Db::name('device_wechat_login')->alias('dwl')
->join(['s2_wechat_account' => 'a'], 'dwl.wechatId = a.wechatId')
->whereIn('dwl.deviceId', $config['devices'])
->group('a.id')
->column('a.wechatId');
if (empty($groupMember)) {
continue;
}
$groupMemberWechatId = Db::table('s2_wechat_friend')
->where('ownerWechatId', $groupMember[0])
->whereIn('wechatId', $groupMember)
->column('id,wechatId');
if (empty($groupMemberWechatId)) {
continue;
}
$groupMemberId = array_keys($groupMemberWechatId);
//流量池用户
$poolItem = Db::name('traffic_source_package_item')
->whereIn('packageId', $config['poolGroups'])
->group('identifier')
->column('identifier');
if (empty($poolItem)) {
continue;
}
//群用户
$groupUser = Db::name('workbench_group_create_item')
->where('workbenchId', $workbench->id)
->whereIn('wechatId', $poolItem)
->group('wechatId')
->column('wechatId');
//待入群的用户
$joinUser = array_diff($poolItem, $groupUser);
if (empty($joinUser)) {
continue;
}
//随机群人数
$groupRandNum = mt_rand($config['groupSizeMin'], $config['groupSizeMax']) - count($groupMember);
//待加入用户
$addGroupUser = [];
$totalRows = count($joinUser);
for ($i = 0; $i < $totalRows; $i += $groupRandNum) {
$batchRows = array_slice($joinUser, $i, $groupRandNum);
if (!empty($batchRows)) {
$user = [];
foreach ($batchRows as $row) {
$user[] = $row;
}
$addGroupUser[] = $user;
}
}
foreach ($addGroupUser as $key => $val) {
//判断第一组用户是否满足创建群的条件
$friendIds = Db::name('wechat_friendship')->alias('f')
->join(['s2_wechat_account' => 'a'], 'f.ownerWechatId=a.wechatId')
->where('f.companyId', $workbench->companyId)
->whereIn('f.wechatId', $val)
->group('f.wechatId')
->column('f.id,f.wechatId,a.id as wechatAccountId');
// 整理数组按wechatAccountId分组值为对应的id数组
$groupedFriends = [];
$wechatAccountIds = [];
$wechatIds = [];
foreach ($friendIds as $friend) {
$wechatAccountId = $friend['wechatAccountId'];
if (!in_array($wechatAccountId, $wechatAccountIds)) {
$wechatAccountIds[] = $wechatAccountId;
}
$friendId = $friend['id'];
if (!isset($groupedFriends[$wechatAccountId])) {
$groupedFriends[$wechatAccountId] = [];
}
$groupedFriends[$wechatAccountId][] = $friendId;
$wechatIds[$friendId] = $friend['wechatId'];
}
//==================== 群相关功能开始 ===========================
$toAccountId = '';
$username = Env::get('api.username2', '');
$password = Env::get('api.password2', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
//$webSocket = new WebSocketController(['userName' => 'wz_03', 'password' => 'key123456', 'accountId' => 5015]);
//拉人进群 $webSocket->CmdChatroomInvite(['wechatChatroomId' => 830794, 'wechatFriendIds' => [21168549]]);
//修改群名称 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'chatroomName' => 'test111']);
//修改群公告 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'announce' => 'test111']);
//建群 $webSocket->CmdChatroomCreate(['chatroomName' => '聊天测试群', 'wechatFriendIds' => [17453051,17453058],'wechatAccountId' => 300745]);
foreach ($groupedFriends as $wechatAccountId => $friendId) {
//列出所有群
$group = '';
$groupMemberNum = 0;
$groupIds = Db::name('workbench_group_create_item')->where(['workbenchId' => $workbench->id])->group('groupId')->column('groupId');
if (!empty($groupIds)) {
//最新创建的群
$group = Db::name('wechat_group')->where(['wechatAccountId' => $wechatAccountId])->whereIn('id', $groupIds)->order('createTime DESC')->find();
//群用户数量
if (!empty($group)) {
$groupMemberNum = Db::name('wechat_group_member')->where('groupId', $group['id'])->count();
}
}
//拉群或者建群
$wechatFriendIds = array_merge($friendId, $groupMemberId);
if ($groupMemberNum == 0 || (count($wechatFriendIds) + $groupMemberNum) >= $groupRandNum) {
if (count($groupIds) > 0) {
$chatroomName = $config['groupNameTemplate'] . count($groupIds) + 1 . '群';
} else {
$chatroomName = $config['groupNameTemplate'];
}
$webSocket->CmdChatroomCreate(['chatroomName' => $chatroomName, 'wechatFriendIds' => $wechatFriendIds,'wechatAccountId' => $wechatAccountId]);
} else {
$webSocket->CmdChatroomInvite(['wechatChatroomId' => $group['id'], 'wechatFriendIds' => $wechatFriendIds]);
}
$installData = [];
//记录进群人员
foreach ($wechatFriendIds as $v) {
$installData[] = [
'workbenchId' => $workbench->id,
'friendId' => $v,
'wechatId' => !empty($wechatIds[$v]) ? $wechatIds[$v] : $groupMemberWechatId[$v],
'groupId' => 0,
'wechatAccountId' => $wechatAccountId,
'createTime' => time(),
];
}
Db::name('workbench_group_create_item')->insertAll($installData);
}
}
}
} catch (\Exception $e) {
Log::error("消息群发任务异常: " . $e->getMessage());
throw $e;
}
}
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isCreate($workbench, $config, $groupIds = [])
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if ($startTimestamp > time() || $endTimestamp < time() || empty($groupIds)) {
return false;
}
// 查询今日建群数量
$count = Db::name('wechat_group')
->whereIn('id', $groupIds)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxGroupsPerDay']) {
return false;
}
return true;
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
}