群创建 代码优化

This commit is contained in:
wong
2025-08-29 09:51:00 +08:00
parent e050da41e5
commit 26e519bec0
6 changed files with 570 additions and 82 deletions

View File

@@ -200,14 +200,18 @@ class WebSocketController extends BaseController
* @param array $data 消息数据
* @return array
*/
protected function sendMessage($data)
protected function sendMessage($data,$receive = true)
{
$this->checkConnection();
try {
$this->client->send(json_encode($data));
$response = $this->client->receive();
return json_decode($response, true);
if ($receive){
$response = $this->client->receive();
return json_decode($response, true);
}else{
return ['code' => 200, 'msg' => '成功'];
}
} catch (\Exception $e) {
Log::error("发送消息失败:" . $e->getMessage());
$this->reconnect();
@@ -787,57 +791,6 @@ class WebSocketController extends BaseController
}
/**
* 邀请好友入群
* @param array $data 请求参数
* @return string JSON响应
*/
public function CmdChatroomInvite($data = [])
{
try {
// 参数验证
if (empty($data)) {
return json_encode(['code' => 400, 'msg' => '参数缺失']);
}
// 验证必要参数
if (empty($data['wechatChatroomId'])) {
return json_encode(['code' => 400, 'msg' => '群ID不能为空']);
}
if (empty($data['wechatFriendId'])) {
return json_encode(['code' => 400, 'msg' => '好友ID不能为空']);
}
if (!is_array($data['wechatFriendId'])) {
return json_encode(['code' => 400, 'msg' => '好友数据格式必须为数组']);
}
if (empty($data['wechatAccountId'])) {
return json_encode(['code' => 400, 'msg' => '微信账号ID不能为空']);
}
// 构建请求参数
$params = [
"cmdType" => "CmdChatroomInvite",
"seq" => time(),
"wechatChatroomId" => $data['wechatChatroomId'],
"wechatFriendId" => $data['wechatFriendId'],
"wechatAccountId" => $data['wechatAccountId']
];
// 记录请求日志
Log::info('邀请好友入群请求:' . json_encode($params, 256));
$message = $this->sendMessage($params);
return json_encode(['code' => 200, 'msg' => '邀请成功', 'data' => $message]);
} catch (\Exception $e) {
// 记录错误日志
Log::error('邀请好友入群异常:' . $e->getMessage());
// 返回错误响应
return json_encode(['code' => 500, 'msg' => '邀请好友入群异常:' . $e->getMessage()]);
}
}
/**
* 添加群好友
* @param $data
@@ -887,4 +840,162 @@ class WebSocketController extends BaseController
return json_encode(['code' => 500, 'msg' => '添加群好友异常:' . $e->getMessage()]);
}
}
/**
* 创建群聊
* @param array $data 请求参数
* @return string JSON响应
*/
public function CmdChatroomCreate($data = [])
{
try {
// 参数验证
if (empty($data)) {
return json_encode(['code' => 400, 'msg' => '参数缺失']);
}
// 验证必要参数
if (empty($data['chatroomName'])) {
return json_encode(['code' => 400, 'msg' => '群名称不能为空']);
}
if (empty($data['wechatFriendIds']) || !is_array($data['wechatFriendIds'])) {
return json_encode(['code' => 400, 'msg' => '好友ID列表不能为空且必须为数组']);
}
if (count($data['wechatFriendIds']) < 2) {
return json_encode(['code' => 400, 'msg' => '创建群聊至少需要2个好友']);
}
if (empty($data['wechatAccountId'])) {
return json_encode(['code' => 400, 'msg' => '微信账号ID不能为空']);
}
// 构建请求参数
$params = [
"cmdType" => "CmdChatroomCreate",
"seq" => time(),
"wechatAccountId" => $data['wechatAccountId'],
"chatroomName" => $data['chatroomName'],
"wechatFriendIds" => $data['wechatFriendIds']
];
// 记录请求日志
Log::info('创建群聊请求:' . json_encode($params, 256));
$message = $this->sendMessage($params,false);
return json_encode(['code' => 200, 'msg' => '群聊创建成功', 'data' => $message]);
} catch (\Exception $e) {
// 记录错误日志
Log::error('创建群聊异常:' . $e->getMessage());
// 返回错误响应
return json_encode(['code' => 500, 'msg' => '创建群聊异常:' . $e->getMessage()]);
}
}
/**
* 邀请好友入群
* @param array $data 请求参数
* @return string JSON响应
*/
public function CmdChatroomInvite($data = [])
{
try {
// 参数验证
if (empty($data)) {
return json_encode(['code' => 400, 'msg' => '参数缺失']);
}
// 验证必要参数
if (empty($data['wechatChatroomId'])) {
return json_encode(['code' => 400, 'msg' => '群ID不能为空']);
}
if (empty($data['wechatFriendIds'])) {
return json_encode(['code' => 400, 'msg' => '好友ID不能为空']);
}
if (!is_array($data['wechatFriendIds'])) {
return json_encode(['code' => 400, 'msg' => '好友数据格式必须为数组']);
}
// 构建请求参数
$params = [
"cmdType" => "CmdChatroomInvite",
"seq" => time(),
"wechatChatroomId" => $data['wechatChatroomId'],
"wechatFriendIds" => $data['wechatFriendIds']
];
// 记录请求日志
Log::info('邀请好友入群请求:' . json_encode($params, 256));
$message = $this->sendMessage($params,false);
return json_encode(['code' => 200, 'msg' => '邀请成功', 'data' => $message]);
} catch (\Exception $e) {
// 记录错误日志
Log::error('邀请好友入群异常:' . $e->getMessage());
// 返回错误响应
return json_encode(['code' => 500, 'msg' => '邀请好友入群异常:' . $e->getMessage()]);
}
}
/**
* 修改群信息(群昵称和群公告)
* @param array $data 请求参数
* @return string JSON响应
*/
public function CmdChatroomModifyInfo($data = [])
{
try {
// 参数验证
if (empty($data)) {
return json_encode(['code' => 400, 'msg' => '参数缺失']);
}
// 验证必要参数
if (empty($data['wechatChatroomId'])) {
return json_encode(['code' => 400, 'msg' => '群ID不能为空']);
}
if (empty($data['wechatAccountId'])) {
return json_encode(['code' => 400, 'msg' => '微信账号ID不能为空']);
}
// 检查是否至少提供了一个修改项
if (empty($data['chatroomName']) && !isset($data['announce'])) {
return json_encode(['code' => 400, 'msg' => '请至少提供群昵称或群公告中的一个参数']);
}
if (!empty($data['chatroomName'])) {
$extra = [
"chatroomName" => $data['chatroomName']
];
} else {
$extra = [
"announce" => $data['announce']
];
}
$params = [
"chatroomOperateType" => !empty($data['chatroomName']) ? 6 : 5,
"cmdType" => "CmdChatroomOperate",
"extra" => json_encode($extra,256),
"seq" => time(),
"wechatAccountId" => $data['wechatAccountId'],
"wechatChatroomId" => $data['wechatChatroomId']
];
// 记录请求日志
Log::info('创建群聊请求:' . json_encode($params, 256));
$message = $this->sendMessage($params,false);
return json_encode(['code' => 200, 'msg' => '群聊创建成功', 'data' => $message]);
} catch (\Exception $e) {
Log::error('修改群信息异常: ' . $e->getMessage());
return json_encode(['code' => 500, 'msg' => '修改群信息失败: ' . $e->getMessage()]);
}
}
}

View File

@@ -13,16 +13,12 @@ class WechatChatroomController extends BaseController
* 获取微信群聊列表
* @return \think\response\Json
*/
public function getlist($pageIndex = '',$pageSize = '',$isInner = false, $isDel = '')
public function getlist($data = [],$isInner = false, $isDel = '')
{
// 获取授权token
$authorization = trim($this->request->header('authorization', $this->authorization));
$authorization = $this->authorization;
if (empty($authorization)) {
if($isInner){
return json_encode(['code'=>500,'msg'=>'缺少授权信息']);
}else{
return errorJson('缺少授权信息');
}
}
try {
@@ -36,15 +32,15 @@ class WechatChatroomController extends BaseController
// 构建请求参数
$params = [
'keyword' => $this->request->param('keyword', ''),
'wechatAccountKeyword' => $this->request->param('wechatAccountKeyword', ''),
'isDeleted' => $this->request->param('isDeleted', $isDeleted),
'allotAccountId' => $this->request->param('allotAccountId', ''),
'groupId' => $this->request->param('groupId', ''),
'wechatChatroomId' => $this->request->param('wechatChatroomId', 0),
'memberKeyword' => $this->request->param('memberKeyword', ''),
'pageIndex' => !empty($pageIndex) ? $pageIndex : input('pageIndex', 0),
'pageSize' => !empty($pageSize) ? $pageSize : input('pageSize', 20)
'keyword' => $data['keyword'] ?? '',
'wechatAccountKeyword' => $data['wechatAccountKeyword'] ?? '',
'isDeleted' => $data['isDeleted'] ?? $isDeleted ,
'allotAccountId' => $data['allotAccountId'] ?? '',
'groupId' => $data['groupId'] ?? '',
'wechatChatroomId' => $data['wechatChatroomId'] ?? '',
'memberKeyword' => $data['memberKeyword'] ?? '',
'pageIndex' => $data['pageIndex'] ?? 1,
'pageSize' => $data['pageSize'] ?? 20
];
// 设置请求头

View File

@@ -27,12 +27,14 @@ return [
'allotrule:autocreate' => 'app\command\AutoCreateAllotRulesCommand', // 自动创建分配规则 √
'content:collect' => 'app\command\ContentCollectCommand', // 内容采集任务 √
'moments:collect' => 'app\command\WechatMomentsCommand', // 朋友圈采集任务
'workbench:autoLike' => 'app\command\WorkbenchAutoLikeCommand', // 工作台自动点赞任务
'workbench:moments' => 'app\command\WorkbenchMomentsCommand', // 工作台朋友圈同步任务
'sync:wechatData' => 'app\command\SyncWechatDataToCkbTask', // 同步微信数据到存客宝
'sync:allFriends' => 'app\command\SyncAllFriendsCommand', // 同步所有在线好友
'workbench:trafficDistribute' => 'app\command\WorkbenchTrafficDistributeCommand', // 工作台流量分发任务
'workbench:groupPush' => 'app\command\WorkbenchGroupPushCommand', // 工作台群组同步任务
'switch:friends' => 'app\command\SwitchFriendsCommand',
'call-recording:list' => 'app\command\CallRecordingListCommand', // 通话记录列表 √
'sync:wechatData' => 'app\command\SyncWechatDataToCkbTask', // 同步微信数据到存客宝
'sync:allFriends' => 'app\command\SyncAllFriendsCommand', // 同步所有在线好友
'workbench:autoLike' => 'app\command\WorkbenchAutoLikeCommand', // 工作台自动点赞任务
'workbench:moments' => 'app\command\WorkbenchMomentsCommand', // 工作台朋友圈同步任务
'workbench:trafficDistribute' => 'app\command\WorkbenchTrafficDistributeCommand', // 工作台流量分发任务
'workbench:groupPush' => 'app\command\WorkbenchGroupPushCommand', // 工作台群推送任务
'workbench:groupCreate' => 'app\command\WorkbenchGroupCreateCommand', // 工作台群创建任务
];

View File

@@ -0,0 +1,76 @@
<?php
namespace app\command;
use app\job\WorkbenchGroupCreateJob;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\console\input\Option;
use think\facade\Log;
use think\Queue;
use think\facade\Cache;
class WorkbenchGroupCreateCommand extends Command
{
// 队列名称
protected $queueName = 'workbench_groupCreate';
protected function configure()
{
$this->setName('workbench:groupCreate')
->setDescription('工作台群创建同步任务队列')
->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID用于区分不同实例', date('YmdHis') . rand(1000, 9999));
}
protected function execute(Input $input, Output $output)
{
$output->writeln('开始处理工作台群创建同步任务...');
try {
// 获取任务ID
$jobId = $input->getOption('jobId');
$output->writeln('任务ID: ' . $jobId);
// 检查队列是否已经在运行
$queueLockKey = "queue_lock:{$this->queueName}";
Cache::rm($queueLockKey);
if (Cache::get($queueLockKey)) {
$output->writeln("队列 {$this->queueName} 已经在运行中,跳过执行");
Log::warning("队列 {$this->queueName} 已经在运行中,跳过执行");
return false;
}
// 设置队列运行锁有效期1小时
Cache::set($queueLockKey, $jobId, 3600);
$output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时");
// 将任务添加到队列
$this->addToQueue($jobId, $queueLockKey);
$output->writeln('工作台群发同步任务已添加到队列');
} catch (\Exception $e) {
Log::error('工作台群发同步任务添加失败:' . $e->getMessage());
$output->writeln('工作台群发同步任务添加失败:' . $e->getMessage());
return false;
}
return true;
}
/**
* 添加任务到队列
* @param string $jobId 任务ID
* @param string $queueLockKey 队列锁键名
*/
public function addToQueue($jobId = '', $queueLockKey = '')
{
$data = [
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
];
// 添加到队列,设置任务名为 workbench_groupCreate
Queue::push(WorkbenchGroupCreateJob::class, $data, $this->queueName);
}
}

View File

@@ -248,7 +248,7 @@ class WorkbenchController extends Controller
if (!empty($item->momentsSync)) {
$item->config = $item->momentsSync;
$item->config->devices = json_decode($item->config->devices, true);
$item->config->contentLibraries = json_decode($item->config->contentLibraries, true);
$item->config->contentGroups = json_decode($item->config->contentLibraries, true);
//同步记录
$sendNum = Db::name('workbench_moments_sync_item')->where(['workbenchId' => $item->id])->count();
$item->syncCount = $sendNum;
@@ -257,15 +257,14 @@ class WorkbenchController extends Controller
// 获取内容库名称
if (!empty($item->config->contentLibraries)) {
$libraryNames = ContentLibrary::whereIn('id', $item->config->contentLibraries)
->column('name');
$item->config->contentLibraryNames = $libraryNames;
if (!empty($item->config->contentGroups)) {
$libraryNames = ContentLibrary::where('id', 'in', $item->config->contentGroups)->select();
$item->config->contentGroupsOptions = $libraryNames;
} else {
$item->config->contentLibraryNames = [];
$item->config->contentGroupsOptions = [];
}
}
unset($item->momentsSync, $item->moments_sync);
unset($item->momentsSync, $item->moments_sync,$item->config->contentLibraries);
break;
case self::TYPE_GROUP_PUSH:
if (!empty($item->groupPush)) {

View File

@@ -0,0 +1,304 @@
<?php
namespace app\job;
use app\api\controller\WebSocketController;
use app\cunkebao\model\Workbench;
use app\cunkebao\model\WorkbenchGroupCreate;
use app\api\model\WechatFriendModel as WechatFriend;
use app\api\model\WechatMomentsModel as WechatMoments;
use think\facade\Log;
use think\facade\Env;
use think\Db;
use think\queue\Job;
use think\facade\Cache;
use think\facade\Config;
use app\api\controller\MomentsController as Moments;
use Workerman\Lib\Timer;
/**
* 工作台群创建任务
* Class WorkbenchGroupCreateJob
* @package app\job
*/
class WorkbenchGroupCreateJob
{
/**
* 最大重试次数
*/
const MAX_RETRY_ATTEMPTS = 3;
/**
* 队列任务处理
* @param Job $job 队列任务
* @param array $data 任务数据
* @return bool
*/
public function fire(Job $job, $data)
{
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
try {
$this->logJobStart($jobId, $queueLockKey);
$this->execute();
$this->handleJobSuccess($job, $queueLockKey);
return true;
} catch (\Exception $e) {
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 执行任务
* @throws \Exception
*/
public function execute()
{
try {
// 获取所有工作台
$workbenches = Workbench::where(['status' => 1, 'type' => 4, 'isDel' => 0])->order('id desc')->select();
foreach ($workbenches as $workbench) {
// 获取工作台配置
$config = WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find();
if (!$config) {
continue;
}
$config['poolGroups'] = json_decode($config['poolGroups'], true);
$config['devices'] = json_decode($config['devices'], true);
if (empty($config['poolGroups']) || empty($config['devices'])) {
continue;
}
//群主及内部成员
$groupMember = Db::name('device_wechat_login')->alias('dwl')
->join(['s2_wechat_account' => 'a'], 'dwl.wechatId = a.wechatId')
->whereIn('dwl.deviceId', $config['devices'])
->group('a.id')
->column('a.wechatId');
if (empty($groupMember)) {
continue;
}
$groupMemberWechatId = Db::table('s2_wechat_friend')
->where('ownerWechatId', $groupMember[0])
->whereIn('wechatId', $groupMember)
->column('id,wechatId');
if (empty($groupMemberWechatId)) {
continue;
}
$groupMemberId = array_keys($groupMemberWechatId);
//流量池用户
$poolItem = Db::name('traffic_source_package_item')
->whereIn('packageId', $config['poolGroups'])
->group('identifier')
->column('identifier');
if (empty($poolItem)) {
continue;
}
//群用户
$groupUser = Db::name('workbench_group_create_item')
->where('workbenchId', $workbench->id)
->whereIn('wechatId', $poolItem)
->group('wechatId')
->column('wechatId');
//待入群的用户
$joinUser = array_diff($poolItem, $groupUser);
if (empty($joinUser)) {
continue;
}
//随机群人数
$groupRandNum = mt_rand($config['groupSizeMin'], $config['groupSizeMax']) - count($groupMember);
//待加入用户
$addGroupUser = [];
$totalRows = count($joinUser);
for ($i = 0; $i < $totalRows; $i += $groupRandNum) {
$batchRows = array_slice($joinUser, $i, $groupRandNum);
if (!empty($batchRows)) {
$user = [];
foreach ($batchRows as $row) {
$user[] = $row;
}
$addGroupUser[] = $user;
}
}
foreach ($addGroupUser as $key => $val) {
//判断第一组用户是否满足创建群的条件
$friendIds = Db::name('wechat_friendship')->alias('f')
->join(['s2_wechat_account' => 'a'], 'f.ownerWechatId=a.wechatId')
->where('f.companyId', $workbench->companyId)
->whereIn('f.wechatId', $val)
->group('f.wechatId')
->column('f.id,f.wechatId,a.id as wechatAccountId');
// 整理数组按wechatAccountId分组值为对应的id数组
$groupedFriends = [];
$wechatAccountIds = [];
$wechatIds = [];
foreach ($friendIds as $friend) {
$wechatAccountId = $friend['wechatAccountId'];
if (!in_array($wechatAccountId, $wechatAccountIds)) {
$wechatAccountIds[] = $wechatAccountId;
}
$friendId = $friend['id'];
if (!isset($groupedFriends[$wechatAccountId])) {
$groupedFriends[$wechatAccountId] = [];
}
$groupedFriends[$wechatAccountId][] = $friendId;
$wechatIds[$friendId] = $friend['wechatId'];
}
//==================== 群相关功能开始 ===========================
$toAccountId = '';
$username = Env::get('api.username2', '');
$password = Env::get('api.password2', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
}
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
//拉人进群 $webSocket->CmdChatroomInvite(['wechatChatroomId' => 830794, 'wechatFriendIds' => [21168549]]);
//修改群名称 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'chatroomName' => 'test111']);
//修改群公告 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'announce' => 'test111']);
//建群 $webSocket->CmdChatroomCreate(['chatroomName' => '聊天测试群', 'wechatFriendIds' => [17453051,17453058],'wechatAccountId' => 300745]);
foreach ($groupedFriends as $wechatAccountId => $friendId) {
//列出所有群
$group = '';
$groupMemberNum = 0;
$groupIds = Db::name('workbench_group_create_item')->where(['workbenchId' => $workbench->id])->group('groupId')->column('groupId');
if (!empty($groupIds)) {
//最新创建的群
$group = Db::name('wechat_group')->where(['wechatAccountId' => $wechatAccountId])->whereIn('id', $groupIds)->order('createTime DESC')->find();
//群用户数量
if (!empty($group)) {
$groupMemberNum = Db::name('wechat_group_member')->where('groupId', $group['id'])->count();
}
}
exit_data($group);
//拉群或者建群
$wechatFriendIds = array_merge($friendId, $groupMemberId);
if ($groupMemberNum == 0 || (count($wechatFriendIds) + $groupMemberNum) >= $groupRandNum) {
if (count($groupIds) > 0) {
$chatroomName = $config['groupNameTemplate'] . count($groupIds) + 1 . '群';
} else {
$chatroomName = $config['groupNameTemplate'];
}
$webSocket->CmdChatroomCreate(['chatroomName' => $chatroomName, 'wechatFriendIds' => $wechatFriendIds,'wechatAccountId' => $wechatAccountId]);
} else {
$webSocket->CmdChatroomInvite(['wechatChatroomId' => $group['id'], 'wechatFriendIds' => $wechatFriendIds]);
}
$installData = [];
//记录进群人员
foreach ($wechatFriendIds as $v) {
$installData[] = [
'workbenchId' => $workbench->id,
'friendId' => $v,
'wechatId' => !empty($wechatIds[$v]) ? $wechatIds[$v] : $groupMemberWechatId[$v],
'groupId' => $group['id'],
'wechatAccountId' => $wechatAccountId,
'createTime' => time(),
];
}
Db::name('workbench_group_create_item')->insertAll($installData);
}
}
}
} catch (\Exception $e) {
Log::error("消息群发任务异常: " . $e->getMessage());
throw $e;
}
}
/**
* 获取设备列表
* @param Workbench $workbench 工作台
* @param WorkbenchGroupPush $config 配置
* @return array|bool
*/
protected function isCreate($workbench, $config, $groupIds = [])
{
// 检查发送间隔新逻辑根据startTime、endTime、maxPerDay动态计算
$today = date('Y-m-d');
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
// 如果时间不符,则跳过
if ($startTimestamp > time() || $endTimestamp < time() || empty($groupIds)) {
return false;
}
// 查询今日建群数量
$count = Db::name('wechat_group')
->whereIn('id', $groupIds)
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
->count();
if ($count >= $config['maxGroupsPerDay']) {
return false;
}
return true;
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台消息群发任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台消息群发任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台消息群发任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
}