Files
cunkebao_v3/Server/application/job/WorkbenchGroupCreateJob.php

516 lines
20 KiB
PHP
Raw Normal View History

2025-08-29 09:51:00 +08:00
<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupCreate;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
2025-12-10 17:58:08 +08:00
use app\common\model\DeviceWechatLogin as DeviceWechatLoginModel;
2025-08-29 09:51:00 +08:00
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
2025-09-16 09:57:06 +08:00
use app\api\controller\WechatController;
2025-12-10 17:58:08 +08:00
use think\Queue;
2025-08-29 09:51:00 +08:00
/**
* 工作台群创建任务
* Class WorkbenchGroupCreateJob
* @package app\job
*/
class WorkbenchGroupCreateJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
2025-12-10 17:58:08 +08:00
/**
* 成员类型常量
*/
const MEMBER_TYPE_OWNER = 1; // 群主成员
const MEMBER_TYPE_ADMIN = 2; // 管理员
const MEMBER_TYPE_OWNER_FRIEND = 3; // 群主好友
const MEMBER_TYPE_ADMIN_FRIEND = 4; // 管理员好友
/**
* 状态常量
*/
const STATUS_PENDING = 0; // 待创建
const STATUS_CREATING = 1; // 创建中
const STATUS_SUCCESS = 2; // 创建成功
const STATUS_FAILED = 3; // 创建失败
const STATUS_ADMIN_FRIEND_ADDED = 4; // 管理员好友已拉入
2025-08-29 09:51:00 +08:00
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
2025-12-10 17:58:08 +08:00
// 1. 查询启用了建群功能的数据
2025-12-11 17:32:59 +08:00
$workbenches = Workbench::where(['status' => 1, 'type' => 4, 'isDel' => 0,'id' => 315])->order('id desc')->select();
2025-08-29 09:51:00 +08:00
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
2025-12-10 17:58:08 +08:00
// 解析配置
2025-08-29 09:51:00 +08:00
$config['poolGroups'] = json_decode($config['poolGroups'], true);
$config['devices'] = json_decode($config['devices'], true);
2025-12-10 17:58:08 +08:00
$config['admins'] = json_decode($config['admins'] ?? '[]', true) ?: [];
2025-08-29 09:51:00 +08:00
if (empty($config['poolGroups']) || empty($config['devices'])) {
continue;
}
2025-12-10 17:58:08 +08:00
$groupMember = [];
$wechatId = Db::name('device_wechat_login')
->whereIn('deviceId',$config['devices'])
->order('id desc')
->value('wechatId');
if (empty($wechatId)) {
2025-08-29 09:51:00 +08:00
continue;
}
2025-12-10 17:58:08 +08:00
$groupMember[] = $wechatId;
// 获取群主好友ID映射所有群主的好友
$groupMemberWechatId = [];
$groupMemberId = [];
foreach ($groupMember as $ownerWechatId) {
$friends = Db::table('s2_wechat_friend')
->where('ownerWechatId', $ownerWechatId)
->whereIn('wechatId', $groupMember)
->field('id,wechatId')
->select();
foreach ($friends as $friend) {
if (!isset($groupMemberWechatId[$friend['id']])) {
$groupMemberWechatId[$friend['id']] = $friend['wechatId'];
$groupMemberId[] = $friend['id'];
}
}
}
2025-08-29 09:51:00 +08:00
if (empty($groupMemberWechatId)) {
continue;
}
2025-12-10 17:58:08 +08:00
// 获取流量池用户
2025-08-29 09:51:00 +08:00
$poolItem = Db::name('traffic_source_package_item')
->whereIn('packageId', $config['poolGroups'])
->group('identifier')
->column('identifier');
2025-12-10 17:58:08 +08:00
2025-08-29 09:51:00 +08:00
if (empty($poolItem)) {
continue;
}
2025-12-10 17:58:08 +08:00
// 获取已入群的用户(排除已成功入群的)
2025-08-29 09:51:00 +08:00
$groupUser = Db::name('workbench_group_create_item')
->where('workbenchId', $workbench->id)
2025-12-10 17:58:08 +08:00
->where('status', 'in', [self::STATUS_SUCCESS, self::STATUS_ADMIN_FRIEND_ADDED])
2025-08-29 09:51:00 +08:00
->whereIn('wechatId', $poolItem)
->group('wechatId')
->column('wechatId');
2025-12-10 17:58:08 +08:00
// 待入群的用户
2025-08-29 09:51:00 +08:00
$joinUser = array_diff($poolItem, $groupUser);
if (empty($joinUser)) {
continue;
}
2025-12-10 17:58:08 +08:00
// 计算随机群人数(不包含管理员,只减去群主成员数)
2025-08-29 09:51:00 +08:00
$groupRandNum = mt_rand($config['groupSizeMin'], $config['groupSizeMax']) - count($groupMember);
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 分批处理待入群用户
2025-08-29 09:51:00 +08:00
$addGroupUser = [];
$totalRows = count($joinUser);
for ($i = 0; $i < $totalRows; $i += $groupRandNum) {
$batchRows = array_slice($joinUser, $i, $groupRandNum);
if (!empty($batchRows)) {
2025-12-10 17:58:08 +08:00
$addGroupUser[] = $batchRows;
2025-08-29 09:51:00 +08:00
}
}
2025-12-10 17:58:08 +08:00
// 初始化WebSocket
$toAccountId = '';
$username = Env::get('api.username2', '');
$password = Env::get('api.password2', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 遍历每批用户
foreach ($addGroupUser as $batchUsers) {
$this->processBatchUsers($workbench, $config, $batchUsers, $groupMemberId, $groupMemberWechatId, $groupRandNum, $webSocket);
2025-08-29 09:51:00 +08:00
}
}
} catch (\Exception $e) {
2025-12-10 17:58:08 +08:00
Log::error("工作台建群任务异常: " . $e->getMessage());
2025-08-29 09:51:00 +08:00
throw $e;
}
}
2025-12-10 17:58:08 +08:00
/**
* 处理一批用户
* @param Workbench $workbench 工作台
* @param array $config 配置
* @param array $batchUsers 批次用户微信ID数组来自流量池
* @param array $groupMemberId 群主成员ID数组
* @param array $groupMemberWechatId 群主成员微信ID映射
* @param int $groupRandNum 随机群人数(不包含管理员)
* @param WebSocketController $webSocket WebSocket实例
*/
protected function processBatchUsers($workbench, $config, $batchUsers, $groupMemberId, $groupMemberWechatId, $groupRandNum, $webSocket)
{
// 1. 获取群主微信ID列表用于验证管理员
// 从群主成员的好友记录中提取所有群主的微信IDownerWechatId
$groupOwnerWechatIds = [];
foreach ($groupMemberId as $memberId) {
$member = Db::table('s2_wechat_friend')->where('id', $memberId)->find();
if ($member && !in_array($member['ownerWechatId'], $groupOwnerWechatIds)) {
$groupOwnerWechatIds[] = $member['ownerWechatId'];
}
}
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 如果从好友表获取不到使用群主成员微信ID列表作为备用
if (empty($groupOwnerWechatIds)) {
$groupOwnerWechatIds = array_values(array_unique($groupMemberWechatId));
}
// 2. 验证并获取管理员好友ID管理员必须是群主的好友
$adminFriendIds = [];
$adminWechatIds = [];
if (!empty($config['admins'])) {
$adminFriends = Db::table('s2_wechat_friend')
->where('id', 'in', $config['admins'])
->field('id,wechatId,ownerWechatId')
->select();
foreach ($adminFriends as $adminFriend) {
// 验证:管理员必须是群主的好友
if (in_array($adminFriend['ownerWechatId'], $groupOwnerWechatIds)) {
$adminFriendIds[] = $adminFriend['id'];
$adminWechatIds[$adminFriend['id']] = $adminFriend['wechatId'];
}
}
}
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 3. 从流量池用户中筛选出是群主好友的用户(按微信账号分组)
$ownerFriendIdsByAccount = [];
$wechatIds = [];
// 获取群主的好友关系(从流量池中筛选)
2025-12-11 17:32:59 +08:00
$ownerFriends = Db::table('s2_wechat_friend')->alias('f')
->join(['s2_wechat_account' => 'a'], 'f.wechatAccountId=a.id')
2025-12-10 17:58:08 +08:00
->whereIn('f.wechatId', $batchUsers)
2025-12-11 17:32:59 +08:00
->whereIn('a.wechatId', $groupOwnerWechatIds)
->where('f.isDeleted', 0)
2025-12-10 17:58:08 +08:00
->field('f.id,f.wechatId,a.id as wechatAccountId')
->select();
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
if (empty($ownerFriends)) {
Log::warning("未找到群主的好友跳过。工作台ID: {$workbench->id}");
return;
}
// 按微信账号分组群主好友
foreach ($ownerFriends as $friend) {
$wechatAccountId = $friend['wechatAccountId'];
if (!isset($ownerFriendIdsByAccount[$wechatAccountId])) {
$ownerFriendIdsByAccount[$wechatAccountId] = [];
}
$ownerFriendIdsByAccount[$wechatAccountId][] = $friend['id'];
$wechatIds[$friend['id']] = $friend['wechatId'];
}
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 4. 遍历每个微信账号,创建群
foreach ($ownerFriendIdsByAccount as $wechatAccountId => $ownerFriendIds) {
// 4.1 获取当前账号的管理员好友ID
$currentAdminFriendIds = [];
$accountWechatId = Db::table('s2_wechat_account')->where('id', $wechatAccountId)->value('wechatId');
foreach ($adminFriendIds as $adminFriendId) {
$adminFriend = Db::table('s2_wechat_friend')->where('id', $adminFriendId)->find();
if ($adminFriend && $adminFriend['ownerWechatId'] == $accountWechatId) {
$currentAdminFriendIds[] = $adminFriendId;
$wechatIds[$adminFriendId] = $adminWechatIds[$adminFriendId];
}
}
// 4.2 获取当前账号的群主成员ID
$currentGroupMemberIds = [];
foreach ($groupMemberId as $memberId) {
$member = Db::table('s2_wechat_friend')->where('id', $memberId)->find();
if ($member && $member['ownerWechatId'] == $accountWechatId) {
$currentGroupMemberIds[] = $memberId;
if (!isset($wechatIds[$memberId])) {
$wechatIds[$memberId] = $groupMemberWechatId[$memberId] ?? '';
}
}
}
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 4.3 限制群主好友数量(按随机群人数)
$limitedOwnerFriendIds = array_slice($ownerFriendIds, 0, $groupRandNum);
2025-12-11 17:32:59 +08:00
2025-12-10 17:58:08 +08:00
// 4.4 创建群:管理员 + 群主成员 + 群主好友(从流量池筛选)
$createFriendIds = array_merge($currentAdminFriendIds, $currentGroupMemberIds, $limitedOwnerFriendIds);
if (count($createFriendIds) < 2) {
Log::warning("建群好友数量不足跳过。工作台ID: {$workbench->id}, 微信账号ID: {$wechatAccountId}");
continue;
}
// 4.5 生成群名称
$existingGroupCount = Db::name('workbench_group_create_item')
->where('workbenchId', $workbench->id)
->where('wechatAccountId', $wechatAccountId)
->where('status', self::STATUS_SUCCESS)
->group('groupId')
->count();
$chatroomName = $existingGroupCount > 0
? $config['groupNameTemplate'] . ($existingGroupCount + 1) . '群'
: $config['groupNameTemplate'];
// 4.6 调用建群接口
$createTime = time();
$createResult = $webSocket->CmdChatroomCreate([
'chatroomName' => $chatroomName,
'wechatFriendIds' => $createFriendIds,
'wechatAccountId' => $wechatAccountId
]);
$createResultData = json_decode($createResult, true);
// 4.7 解析建群结果获取群ID
$chatroomId = 0;
if (!empty($createResultData) && isset($createResultData['code']) && $createResultData['code'] == 200) {
// 尝试从返回数据中获取群ID根据实际API返回格式调整
if (isset($createResultData['data']['chatroomId'])) {
$chatroomId = $createResultData['data']['chatroomId'];
} elseif (isset($createResultData['data']['id'])) {
$chatroomId = $createResultData['data']['id'];
}
}
// 4.8 记录创建请求
$installData = [];
foreach ($createFriendIds as $friendId) {
$memberType = in_array($friendId, $currentAdminFriendIds)
? self::MEMBER_TYPE_ADMIN
: (in_array($friendId, $currentGroupMemberIds) ? self::MEMBER_TYPE_OWNER : self::MEMBER_TYPE_OWNER_FRIEND);
$installData[] = [
'workbenchId' => $workbench->id,
'friendId' => $friendId,
'wechatId' => $wechatIds[$friendId] ?? ($groupMemberWechatId[$friendId] ?? ''),
'groupId' => $chatroomId,
'wechatAccountId' => $wechatAccountId,
'status' => $chatroomId > 0 ? self::STATUS_SUCCESS : self::STATUS_CREATING,
'memberType' => $memberType,
'retryCount' => 0,
'chatroomId' => $chatroomId > 0 ? $chatroomId : null,
'createTime' => $createTime,
];
}
Db::name('workbench_group_create_item')->insertAll($installData);
// 5. 如果群创建成功,拉管理员的好友进群
if ($chatroomId > 0 && !empty($currentAdminFriendIds)) {
$this->inviteAdminFriends($workbench, $config, $batchUsers, $currentAdminFriendIds, $chatroomId, $wechatAccountId, $wechatIds, $createTime, $webSocket);
}
}
}
/**
* 拉管理员的好友进群
* @param Workbench $workbench 工作台
* @param array $config 配置
* @param array $batchUsers 批次用户流量池微信ID数组
* @param array $adminFriendIds 管理员好友ID数组
* @param int $chatroomId 群ID
* @param int $wechatAccountId 微信账号ID
* @param array $wechatIds 好友ID到微信ID的映射
* @param int $createTime 创建时间
* @param WebSocketController $webSocket WebSocket实例
*/
protected function inviteAdminFriends($workbench, $config, $batchUsers, $adminFriendIds, $chatroomId, $wechatAccountId, $wechatIds, $createTime, $webSocket)
{
// 获取管理员的微信ID列表
$adminWechatIds = [];
foreach ($adminFriendIds as $adminFriendId) {
if (isset($wechatIds[$adminFriendId])) {
$adminWechatIds[] = $wechatIds[$adminFriendId];
}
}
if (empty($adminWechatIds)) {
return;
}
// 从流量池用户中筛选出是管理员好友的用户
2025-12-11 17:32:59 +08:00
$adminFriendsFromPool = Db::table('s2_wechat_friend')->alias('f')
->join(['s2_wechat_account' => 'a'], 'f.wechatAccountId=a.id')
2025-12-10 17:58:08 +08:00
->whereIn('f.wechatId', $batchUsers)
2025-12-11 17:32:59 +08:00
->whereIn('a.wechatId', $adminWechatIds)
2025-12-10 17:58:08 +08:00
->where('a.id', $wechatAccountId)
2025-12-11 17:32:59 +08:00
->where('f.isDeleted', 0)
2025-12-10 17:58:08 +08:00
->field('f.id,f.wechatId')
->select();
if (empty($adminFriendsFromPool)) {
Log::info("未找到管理员的好友跳过拉人。工作台ID: {$workbench->id}, 群ID: {$chatroomId}");
return;
}
// 提取好友ID列表
$adminFriendIdsToInvite = [];
foreach ($adminFriendsFromPool as $friend) {
$adminFriendIdsToInvite[] = $friend['id'];
$wechatIds[$friend['id']] = $friend['wechatId'];
}
// 调用拉人接口
$inviteResult = $webSocket->CmdChatroomInvite([
'wechatChatroomId' => $chatroomId,
'wechatFriendIds' => $adminFriendIdsToInvite
]);
$inviteResultData = json_decode($inviteResult, true);
$inviteSuccess = !empty($inviteResultData) && isset($inviteResultData['code']) && $inviteResultData['code'] == 200;
// 记录管理员好友拉入状态
$adminFriendData = [];
foreach ($adminFriendIdsToInvite as $friendId) {
$adminFriendData[] = [
'workbenchId' => $workbench->id,
'friendId' => $friendId,
'wechatId' => $wechatIds[$friendId] ?? '',
'groupId' => $chatroomId,
'wechatAccountId' => $wechatAccountId,
'status' => $inviteSuccess ? self::STATUS_ADMIN_FRIEND_ADDED : self::STATUS_FAILED,
'memberType' => self::MEMBER_TYPE_ADMIN_FRIEND,
'retryCount' => 0,
'chatroomId' => $chatroomId,
'createTime' => $createTime,
];
}
Db::name('workbench_group_create_item')->insertAll($adminFriendData);
if ($inviteSuccess) {
Log::info("管理员好友拉入成功。工作台ID: {$workbench->id}, 群ID: {$chatroomId}, 拉入数量: " . count($adminFriendIdsToInvite));
} else {
Log::warning("管理员好友拉入失败。工作台ID: {$workbench->id}, 群ID: {$chatroomId}");
}
}
2025-08-29 09:51:00 +08:00
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isCreate($workbench, $config, $groupIds = [])
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if ($startTimestamp > time() || $endTimestamp < time() || empty($groupIds)) {
return false;
}
// 查询今日建群数量
$count = Db::name('wechat_group')
->whereIn('id', $groupIds)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxGroupsPerDay']) {
return false;
}
return true;
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
}