2025-08-29 09:51:00 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
|
|
namespace app\job;
|
|
|
|
|
|
|
|
|
|
|
|
use app\api\controller\WebSocketController;
|
|
|
|
|
|
use app\cunkebao\model\Workbench;
|
|
|
|
|
|
use app\cunkebao\model\WorkbenchGroupCreate;
|
|
|
|
|
|
use app\api\model\WechatFriendModel as WechatFriend;
|
|
|
|
|
|
use app\api\model\WechatMomentsModel as WechatMoments;
|
|
|
|
|
|
use think\facade\Log;
|
|
|
|
|
|
use think\facade\Env;
|
|
|
|
|
|
use think\Db;
|
|
|
|
|
|
use think\queue\Job;
|
|
|
|
|
|
use think\facade\Cache;
|
|
|
|
|
|
use think\facade\Config;
|
|
|
|
|
|
use app\api\controller\MomentsController as Moments;
|
|
|
|
|
|
use Workerman\Lib\Timer;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 工作台群创建任务
|
|
|
|
|
|
* Class WorkbenchGroupCreateJob
|
|
|
|
|
|
* @package app\job
|
|
|
|
|
|
*/
|
|
|
|
|
|
class WorkbenchGroupCreateJob
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 最大重试次数
|
|
|
|
|
|
*/
|
|
|
|
|
|
const MAX_RETRY_ATTEMPTS = 3;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 队列任务处理
|
|
|
|
|
|
* @param Job $job 队列任务
|
|
|
|
|
|
* @param array $data 任务数据
|
|
|
|
|
|
* @return bool
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function fire(Job $job, $data)
|
|
|
|
|
|
{
|
|
|
|
|
|
$jobId = $data['jobId'] ?? '';
|
|
|
|
|
|
$queueLockKey = $data['queueLockKey'] ?? '';
|
|
|
|
|
|
try {
|
|
|
|
|
|
$this->logJobStart($jobId, $queueLockKey);
|
|
|
|
|
|
$this->execute();
|
|
|
|
|
|
$this->handleJobSuccess($job, $queueLockKey);
|
|
|
|
|
|
return true;
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
|
|
return $this->handleJobError($e, $job, $queueLockKey);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 执行任务
|
|
|
|
|
|
* @throws \Exception
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function execute()
|
|
|
|
|
|
{
|
|
|
|
|
|
try {
|
|
|
|
|
|
// 获取所有工作台
|
|
|
|
|
|
$workbenches = Workbench::where(['status' => 1, 'type' => 4, 'isDel' => 0])->order('id desc')->select();
|
|
|
|
|
|
foreach ($workbenches as $workbench) {
|
|
|
|
|
|
// 获取工作台配置
|
|
|
|
|
|
$config = WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find();
|
|
|
|
|
|
if (!$config) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
$config['poolGroups'] = json_decode($config['poolGroups'], true);
|
|
|
|
|
|
$config['devices'] = json_decode($config['devices'], true);
|
|
|
|
|
|
if (empty($config['poolGroups']) || empty($config['devices'])) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
//群主及内部成员
|
|
|
|
|
|
$groupMember = Db::name('device_wechat_login')->alias('dwl')
|
|
|
|
|
|
->join(['s2_wechat_account' => 'a'], 'dwl.wechatId = a.wechatId')
|
|
|
|
|
|
->whereIn('dwl.deviceId', $config['devices'])
|
|
|
|
|
|
->group('a.id')
|
|
|
|
|
|
->column('a.wechatId');
|
|
|
|
|
|
if (empty($groupMember)) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
$groupMemberWechatId = Db::table('s2_wechat_friend')
|
|
|
|
|
|
->where('ownerWechatId', $groupMember[0])
|
|
|
|
|
|
->whereIn('wechatId', $groupMember)
|
|
|
|
|
|
->column('id,wechatId');
|
|
|
|
|
|
if (empty($groupMemberWechatId)) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
$groupMemberId = array_keys($groupMemberWechatId);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//流量池用户
|
|
|
|
|
|
$poolItem = Db::name('traffic_source_package_item')
|
|
|
|
|
|
->whereIn('packageId', $config['poolGroups'])
|
|
|
|
|
|
->group('identifier')
|
|
|
|
|
|
->column('identifier');
|
|
|
|
|
|
|
|
|
|
|
|
if (empty($poolItem)) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//群用户
|
|
|
|
|
|
$groupUser = Db::name('workbench_group_create_item')
|
|
|
|
|
|
->where('workbenchId', $workbench->id)
|
|
|
|
|
|
->whereIn('wechatId', $poolItem)
|
|
|
|
|
|
->group('wechatId')
|
|
|
|
|
|
->column('wechatId');
|
|
|
|
|
|
|
|
|
|
|
|
//待入群的用户
|
|
|
|
|
|
$joinUser = array_diff($poolItem, $groupUser);
|
|
|
|
|
|
if (empty($joinUser)) {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//随机群人数
|
|
|
|
|
|
$groupRandNum = mt_rand($config['groupSizeMin'], $config['groupSizeMax']) - count($groupMember);
|
|
|
|
|
|
|
|
|
|
|
|
//待加入用户
|
|
|
|
|
|
$addGroupUser = [];
|
|
|
|
|
|
$totalRows = count($joinUser);
|
|
|
|
|
|
for ($i = 0; $i < $totalRows; $i += $groupRandNum) {
|
|
|
|
|
|
$batchRows = array_slice($joinUser, $i, $groupRandNum);
|
|
|
|
|
|
if (!empty($batchRows)) {
|
|
|
|
|
|
$user = [];
|
|
|
|
|
|
foreach ($batchRows as $row) {
|
|
|
|
|
|
$user[] = $row;
|
|
|
|
|
|
}
|
|
|
|
|
|
$addGroupUser[] = $user;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($addGroupUser as $key => $val) {
|
|
|
|
|
|
//判断第一组用户是否满足创建群的条件
|
|
|
|
|
|
$friendIds = Db::name('wechat_friendship')->alias('f')
|
|
|
|
|
|
->join(['s2_wechat_account' => 'a'], 'f.ownerWechatId=a.wechatId')
|
|
|
|
|
|
->where('f.companyId', $workbench->companyId)
|
|
|
|
|
|
->whereIn('f.wechatId', $val)
|
|
|
|
|
|
->group('f.wechatId')
|
|
|
|
|
|
->column('f.id,f.wechatId,a.id as wechatAccountId');
|
2025-09-08 17:24:44 +08:00
|
|
|
|
|
2025-08-29 09:51:00 +08:00
|
|
|
|
// 整理数组:按wechatAccountId分组,值为对应的id数组
|
|
|
|
|
|
$groupedFriends = [];
|
|
|
|
|
|
$wechatAccountIds = [];
|
|
|
|
|
|
$wechatIds = [];
|
|
|
|
|
|
foreach ($friendIds as $friend) {
|
|
|
|
|
|
$wechatAccountId = $friend['wechatAccountId'];
|
|
|
|
|
|
if (!in_array($wechatAccountId, $wechatAccountIds)) {
|
|
|
|
|
|
$wechatAccountIds[] = $wechatAccountId;
|
|
|
|
|
|
}
|
|
|
|
|
|
$friendId = $friend['id'];
|
|
|
|
|
|
if (!isset($groupedFriends[$wechatAccountId])) {
|
|
|
|
|
|
$groupedFriends[$wechatAccountId] = [];
|
|
|
|
|
|
}
|
|
|
|
|
|
$groupedFriends[$wechatAccountId][] = $friendId;
|
|
|
|
|
|
$wechatIds[$friendId] = $friend['wechatId'];
|
|
|
|
|
|
}
|
|
|
|
|
|
//==================== 群相关功能开始 ===========================
|
|
|
|
|
|
$toAccountId = '';
|
|
|
|
|
|
$username = Env::get('api.username2', '');
|
|
|
|
|
|
$password = Env::get('api.password2', '');
|
|
|
|
|
|
if (!empty($username) || !empty($password)) {
|
|
|
|
|
|
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
|
|
|
|
|
|
}
|
|
|
|
|
|
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
|
|
|
|
|
|
//拉人进群 $webSocket->CmdChatroomInvite(['wechatChatroomId' => 830794, 'wechatFriendIds' => [21168549]]);
|
|
|
|
|
|
//修改群名称 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'chatroomName' => 'test111']);
|
|
|
|
|
|
//修改群公告 $webSocket->CmdChatroomModifyInfo(['wechatChatroomId' => 830794, 'wechatAccountId' => 300745,'announce' => 'test111']);
|
|
|
|
|
|
//建群 $webSocket->CmdChatroomCreate(['chatroomName' => '聊天测试群', 'wechatFriendIds' => [17453051,17453058],'wechatAccountId' => 300745]);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($groupedFriends as $wechatAccountId => $friendId) {
|
|
|
|
|
|
//列出所有群
|
|
|
|
|
|
$group = '';
|
|
|
|
|
|
$groupMemberNum = 0;
|
|
|
|
|
|
$groupIds = Db::name('workbench_group_create_item')->where(['workbenchId' => $workbench->id])->group('groupId')->column('groupId');
|
|
|
|
|
|
if (!empty($groupIds)) {
|
|
|
|
|
|
//最新创建的群
|
|
|
|
|
|
$group = Db::name('wechat_group')->where(['wechatAccountId' => $wechatAccountId])->whereIn('id', $groupIds)->order('createTime DESC')->find();
|
|
|
|
|
|
//群用户数量
|
|
|
|
|
|
if (!empty($group)) {
|
|
|
|
|
|
$groupMemberNum = Db::name('wechat_group_member')->where('groupId', $group['id'])->count();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//拉群或者建群
|
|
|
|
|
|
$wechatFriendIds = array_merge($friendId, $groupMemberId);
|
2025-09-08 17:24:44 +08:00
|
|
|
|
|
2025-08-29 09:51:00 +08:00
|
|
|
|
if ($groupMemberNum == 0 || (count($wechatFriendIds) + $groupMemberNum) >= $groupRandNum) {
|
|
|
|
|
|
if (count($groupIds) > 0) {
|
|
|
|
|
|
$chatroomName = $config['groupNameTemplate'] . count($groupIds) + 1 . '群';
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$chatroomName = $config['groupNameTemplate'];
|
|
|
|
|
|
}
|
|
|
|
|
|
$webSocket->CmdChatroomCreate(['chatroomName' => $chatroomName, 'wechatFriendIds' => $wechatFriendIds,'wechatAccountId' => $wechatAccountId]);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$webSocket->CmdChatroomInvite(['wechatChatroomId' => $group['id'], 'wechatFriendIds' => $wechatFriendIds]);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
$installData = [];
|
|
|
|
|
|
|
|
|
|
|
|
//记录进群人员
|
|
|
|
|
|
foreach ($wechatFriendIds as $v) {
|
|
|
|
|
|
$installData[] = [
|
|
|
|
|
|
'workbenchId' => $workbench->id,
|
|
|
|
|
|
'friendId' => $v,
|
|
|
|
|
|
'wechatId' => !empty($wechatIds[$v]) ? $wechatIds[$v] : $groupMemberWechatId[$v],
|
2025-09-08 17:24:44 +08:00
|
|
|
|
'groupId' => 0,
|
2025-08-29 09:51:00 +08:00
|
|
|
|
'wechatAccountId' => $wechatAccountId,
|
|
|
|
|
|
'createTime' => time(),
|
|
|
|
|
|
];
|
|
|
|
|
|
}
|
|
|
|
|
|
Db::name('workbench_group_create_item')->insertAll($installData);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
|
|
Log::error("消息群发任务异常: " . $e->getMessage());
|
|
|
|
|
|
throw $e;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取设备列表
|
|
|
|
|
|
* @param Workbench $workbench 工作台
|
|
|
|
|
|
* @param WorkbenchGroupPush $config 配置
|
|
|
|
|
|
* @return array|bool
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function isCreate($workbench, $config, $groupIds = [])
|
|
|
|
|
|
{
|
|
|
|
|
|
// 检查发送间隔(新逻辑:根据startTime、endTime、maxPerDay动态计算)
|
|
|
|
|
|
$today = date('Y-m-d');
|
|
|
|
|
|
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
|
|
|
|
|
|
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
|
|
|
|
|
|
|
|
|
|
|
|
// 如果时间不符,则跳过
|
|
|
|
|
|
if ($startTimestamp > time() || $endTimestamp < time() || empty($groupIds)) {
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 查询今日建群数量
|
|
|
|
|
|
$count = Db::name('wechat_group')
|
|
|
|
|
|
->whereIn('id', $groupIds)
|
|
|
|
|
|
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
|
|
|
|
|
|
->count();
|
|
|
|
|
|
if ($count >= $config['maxGroupsPerDay']) {
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
return true;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 记录任务开始
|
|
|
|
|
|
* @param string $jobId
|
|
|
|
|
|
* @param string $queueLockKey
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function logJobStart($jobId, $queueLockKey)
|
|
|
|
|
|
{
|
|
|
|
|
|
Log::info('开始处理工作台消息群发任务: ' . json_encode([
|
|
|
|
|
|
'jobId' => $jobId,
|
|
|
|
|
|
'queueLockKey' => $queueLockKey
|
|
|
|
|
|
]));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理任务成功
|
|
|
|
|
|
* @param Job $job
|
|
|
|
|
|
* @param string $queueLockKey
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function handleJobSuccess($job, $queueLockKey)
|
|
|
|
|
|
{
|
|
|
|
|
|
$job->delete();
|
|
|
|
|
|
Cache::rm($queueLockKey);
|
|
|
|
|
|
Log::info('工作台消息群发任务执行成功');
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理任务错误
|
|
|
|
|
|
* @param \Exception $e
|
|
|
|
|
|
* @param Job $job
|
|
|
|
|
|
* @param string $queueLockKey
|
|
|
|
|
|
* @return bool
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function handleJobError(\Exception $e, $job, $queueLockKey)
|
|
|
|
|
|
{
|
|
|
|
|
|
Log::error('工作台消息群发任务异常:' . $e->getMessage());
|
|
|
|
|
|
|
|
|
|
|
|
if (!empty($queueLockKey)) {
|
|
|
|
|
|
Cache::rm($queueLockKey);
|
|
|
|
|
|
Log::info("由于异常释放队列锁: {$queueLockKey}");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
|
|
|
|
|
|
$job->delete();
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$job->release(Config::get('queue.failed_delay', 10));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|