2025-05-30 18:04:10 +08:00
< ? php
namespace app\job ;
use think\queue\Job ;
use think\facade\Log ;
use think\Queue ;
use think\facade\Config ;
use think\facade\Cache ;
use app\cunkebao\model\Workbench ;
use app\cunkebao\model\WorkbenchTrafficConfig ;
use think\Db ;
2025-06-04 11:38:43 +08:00
use app\api\controller\AutomaticAssign ;
2025-05-30 18:04:10 +08:00
class WorkbenchTrafficDistributeJob
{
const MAX_RETRY_ATTEMPTS = 3 ;
public function fire ( Job $job , $data )
{
$jobId = $data [ 'jobId' ] ? ? '' ;
$queueLockKey = $data [ 'queueLockKey' ] ? ? '' ;
try {
$this -> logJobStart ( $jobId , $queueLockKey );
$workbenches = $this -> getActiveWorkbenches ();
if ( empty ( $workbenches )) {
$this -> handleEmptyWorkbenches ( $job , $queueLockKey );
return true ;
}
$this -> processWorkbenches ( $workbenches );
$this -> handleJobSuccess ( $job , $queueLockKey );
return true ;
} catch ( \Exception $e ) {
return $this -> handleJobError ( $e , $job , $queueLockKey );
}
}
protected function getActiveWorkbenches ()
{
return Workbench :: where ([
[ 'status' , '=' , 1 ],
[ 'isDel' , '=' , 0 ],
[ 'type' , '=' , 5 ]
]) -> order ( 'id DESC' ) -> select ();
}
protected function processWorkbenches ( $workbenches )
{
foreach ( $workbenches as $workbench ) {
try {
$this -> processSingleWorkbench ( $workbench );
} catch ( \Exception $e ) {
Log :: error ( " 处理流量分发工作台 { $workbench -> id } 失败: " . $e -> getMessage ());
}
}
}
protected function processSingleWorkbench ( $workbench )
{
$page = 1 ;
2025-07-15 11:06:52 +08:00
$pageSize = 20 ;
2025-06-30 17:31:01 +08:00
2025-05-30 18:04:10 +08:00
$config = WorkbenchTrafficConfig :: where ( 'workbenchId' , $workbench -> id ) -> find ();
if ( ! $config ) {
Log :: error ( " 流量分发工作台 { $workbench -> id } 配置获取失败 " );
return ;
}
2025-05-30 18:12:41 +08:00
// 验证是否在流量分发时间范围内
if ( ! $this -> isTimeRange ( $config ) && $config [ 'timeType' ] == 2 ) {
2025-05-30 18:04:10 +08:00
return ;
2025-05-30 18:12:41 +08:00
}
2025-06-04 11:38:43 +08:00
// 获取当天未超额的可用账号
2025-06-07 17:34:20 +08:00
if ( empty ( $config [ 'account' ])){
Log :: error ( " 流量分发工作台 { $workbench -> id } 未配置分发的客服 " );
return ;
}
$accountIds = json_decode ( $config [ 'account' ], true );
2025-06-04 11:38:43 +08:00
$todayStart = strtotime ( date ( 'Y-m-d 00:00:00' ));
$todayEnd = strtotime ( date ( 'Y-m-d 23:59:59' ));
2025-05-30 18:12:41 +08:00
$accounts = Db :: table ( 's2_company_account' )
2025-06-04 11:38:43 +08:00
-> alias ( 'a' )
-> where ([ 'a.departmentId' => $workbench -> companyId , 'a.status' => 0 ])
2025-06-07 17:34:20 +08:00
-> whereIn ( 'a.id' , $accountIds )
2025-06-04 11:38:43 +08:00
-> whereNotLike ( 'a.userName' , '%_offline%' )
-> whereNotLike ( 'a.userName' , '%_delete%' )
-> leftJoin ( 'workbench_traffic_config_item wti' , " wti.wechatAccountId = a.id AND wti.workbenchId = { $workbench -> id } AND wti.createTime BETWEEN { $todayStart } AND { $todayEnd } " )
2025-06-30 17:31:01 +08:00
-> field ( 'a.id,a.userName,a.realName,a.nickname,COUNT(wti.id) as todayCount' )
2025-06-04 11:38:43 +08:00
-> group ( 'a.id' )
-> having ( 'todayCount <= ' . $config [ 'maxPerDay' ])
2025-05-30 18:04:10 +08:00
-> select ();
2025-05-30 18:12:41 +08:00
$accountNum = count ( $accounts );
2025-06-04 11:38:43 +08:00
if ( $accountNum < 1 ) {
Log :: info ( " 流量分发工作台 { $workbench -> id } 可分配账号少于1个 " );
2025-05-30 18:04:10 +08:00
return ;
}
2025-06-04 11:38:43 +08:00
$automaticAssign = new AutomaticAssign ();
do {
$friends = $this -> getFriendsByLabels ( $workbench , $config , $page , $pageSize );
2025-06-11 15:31:46 +08:00
2025-06-04 11:38:43 +08:00
if ( empty ( $friends ) || count ( $friends ) == 0 ) {
Log :: info ( " 流量分发工作台 { $workbench -> id } 没有可分配的好友 " );
break ;
}
$i = 0 ;
$accountNum = count ( $accounts );
foreach ( $friends as $friend ) {
if ( $accountNum == 0 ) {
Log :: info ( " 流量分发工作台 { $workbench -> id } 所有账号今日分配已满 " );
break 2 ;
}
if ( $i >= $accountNum ) {
$i = 0 ;
}
$account = $accounts [ $i ];
2025-05-30 18:12:41 +08:00
2025-06-04 11:38:43 +08:00
// 如果该账号今天分配的记录数加上本次分配的记录数超过最大限制
if (( $account [ 'todayCount' ] + $pageSize ) >= $config [ 'maxPerDay' ]) {
// 查询该客服账号当天分配记录数
$todayCount = Db :: name ( 'workbench_traffic_config_item' )
-> where ( 'workbenchId' , $workbench -> id )
-> where ( 'wechatAccountId' , $account [ 'id' ])
-> whereBetween ( 'createTime' , [ $todayStart , $todayEnd ])
-> count ();
if ( $todayCount >= $config [ 'maxPerDay' ]) {
unset ( $accounts [ $i ]);
$accounts = array_values ( $accounts );
$accountNum = count ( $accounts );
$i ++ ;
continue ;
}
}
2025-05-30 18:04:10 +08:00
2025-06-04 11:38:43 +08:00
// 执行切换好友命令
2025-07-15 11:06:52 +08:00
$res = $automaticAssign -> allotWechatFriend ([
2025-06-04 11:38:43 +08:00
'wechatFriendId' => $friend [ 'id' ],
'toAccountId' => $account [ 'id' ]
], true );
2025-07-15 11:06:52 +08:00
$res = json_decode ( $res , true );
if ( $res [ 'code' ] == 200 ){
Db :: table ( 's2_wechat_friend' )
-> where ( 'id' , $friend [ 'id' ])
-> update ([
'accountId' => $account [ 'id' ],
'accountUserName' => $account [ 'userName' ],
'accountRealName' => $account [ 'realName' ],
'accountNickname' => $account [ 'nickname' ],
2025-06-30 17:31:01 +08:00
]);
2025-07-15 11:06:52 +08:00
// 写入分配记录表
Db :: name ( 'workbench_traffic_config_item' ) -> insert ([
'workbenchId' => $workbench -> id ,
'deviceId' => $friend [ 'deviceId' ],
'wechatFriendId' => $friend [ 'id' ],
'wechatAccountId' => $account [ 'id' ],
'createTime' => time (),
'exp' => $config [ 'exp' ],
'expTime' => time () + 86400 * $config [ 'exp' ],
]);
Log :: info ( " 流量分发工作台 { $workbench -> id } 好友[ { $friend [ 'id' ] } ]分配给客服[ { $account [ 'id' ] } ] 成功 " );
}
2025-06-04 11:38:43 +08:00
$i ++ ;
}
break ;
$page ++ ;
} while ( true );
Log :: info ( " 流量分发工作台 { $workbench -> id } 执行分发逻辑完成 " );
2025-05-30 18:04:10 +08:00
}
2025-06-04 11:38:43 +08:00
2025-05-30 18:04:10 +08:00
/**
* 检查是否在流量分发时间范围内
* @ param WorkbenchAutoLike $config
* @ return bool
*/
protected function isTimeRange ( $config )
{
$currentTime = date ( 'H:i' );
if ( $currentTime < $config [ 'startTime' ] || $currentTime > $config [ 'endTime' ]) {
Log :: info ( " 当前时间 { $currentTime } 不在流量分发时间范围内 ( { $config [ 'startTime' ] } - { $config [ 'endTime' ] } ) " );
return false ;
}
return true ;
}
/**
* 一次性查出所有包含指定标签数组的好友(支持分页)
2025-05-30 18:12:41 +08:00
* @ param object $workbench 工作台对象
* @ param object $config 配置对象
2025-05-30 18:04:10 +08:00
* @ param int $page 页码
* @ param int $pageSize 每页数量
* @ return array
*/
2025-05-30 18:12:41 +08:00
protected function getFriendsByLabels ( $workbench , $config , $page = 1 , $pageSize = 20 )
2025-05-30 18:04:10 +08:00
{
2025-05-30 18:12:41 +08:00
$labels = [];
if ( ! empty ( $config [ 'pools' ])) {
$labels = is_array ( $config [ 'pools' ]) ? $config [ 'pools' ] : json_decode ( $config [ 'pools' ], true );
}
2025-06-11 15:31:46 +08:00
2025-05-30 18:12:41 +08:00
$devices = [];
if ( ! empty ( $config [ 'devices' ])) {
$devices = is_array ( $config [ 'devices' ]) ? $config [ 'devices' ] : json_decode ( $config [ 'devices' ], true );
}
2025-06-11 15:31:46 +08:00
if ( empty ( $devices )) {
2025-05-30 18:12:41 +08:00
return [];
}
2025-05-30 18:04:10 +08:00
$query = Db :: table ( 's2_wechat_friend' ) -> alias ( 'wf' )
2025-05-30 18:12:41 +08:00
-> join ([ 's2_company_account' => 'sa' ], 'sa.id = wf.accountId' , 'left' )
-> join ([ 's2_wechat_account' => 'wa' ], 'wa.id = wf.wechatAccountId' , 'left' )
2025-07-15 11:06:52 +08:00
-> join ( 'workbench_traffic_config_item wtci' , 'wtci.isRecycle = 0 and wtci.wechatFriendId = wf.id AND wtci.workbenchId = ' . $config [ 'workbenchId' ], 'left' )
2025-05-30 18:12:41 +08:00
-> where ([
[ 'wf.isDeleted' , '=' , 0 ],
2025-06-30 14:02:08 +08:00
[ 'wf.isPassed' , '=' , 1 ],
2025-06-11 15:31:46 +08:00
//['sa.departmentId', '=', $workbench->companyId],
2025-05-30 18:12:41 +08:00
[ 'wtci.id' , 'null' , null ]
])
-> whereIn ( 'wa.currentDeviceId' , $devices )
-> field ( 'wf.id,wf.wechatAccountId,wf.wechatId,wf.labels,sa.userName,wa.currentDeviceId as deviceId' );
2025-06-11 15:31:46 +08:00
2025-06-19 15:14:41 +08:00
2025-06-11 15:31:46 +08:00
if ( ! empty ( $labels )){
$query -> where ( function ( $q ) use ( $labels ) {
foreach ( $labels as $label ) {
$q -> whereOrRaw ( " JSON_CONTAINS(wf.labels, ' \" { $label } \" ') " );
}
});
}
2025-06-04 11:38:43 +08:00
$list = $query -> page ( $page , $pageSize ) -> order ( 'wf.id DESC' ) -> select ();
2025-06-19 15:14:41 +08:00
2025-05-30 18:04:10 +08:00
return $list ;
}
protected function logJobStart ( $jobId , $queueLockKey )
{
Log :: info ( '开始处理流量分发任务: ' . json_encode ([
'jobId' => $jobId ,
'queueLockKey' => $queueLockKey
]));
}
protected function handleJobSuccess ( $job , $queueLockKey )
{
$job -> delete ();
Cache :: rm ( $queueLockKey );
Log :: info ( '流量分发任务执行成功' );
}
protected function handleJobError ( \Exception $e , $job , $queueLockKey )
{
Log :: error ( '流量分发任务异常:' . $e -> getMessage ());
if ( ! empty ( $queueLockKey )) {
Cache :: rm ( $queueLockKey );
Log :: info ( " 由于异常释放队列锁: { $queueLockKey } " );
}
if ( $job -> attempts () > self :: MAX_RETRY_ATTEMPTS ) {
$job -> delete ();
} else {
$job -> release ( Config :: get ( 'queue.failed_delay' , 10 ));
}
return false ;
}
protected function handleEmptyWorkbenches ( Job $job , $queueLockKey )
{
Log :: info ( '没有需要处理的流量分发任务' );
$job -> delete ();
Cache :: rm ( $queueLockKey );
}
}