通讯录导入功能提交
This commit is contained in:
@@ -419,9 +419,17 @@ class DeviceController extends BaseController
|
||||
$response = handleApiResponse($result);
|
||||
|
||||
if(empty($response)){
|
||||
return successJson([], '操作成功');
|
||||
if($isInner){
|
||||
return json_encode(['code'=>200,'msg'=>'更新设备联系人成功' ]);
|
||||
}else{
|
||||
return successJson([],'更新设备联系人失败:通讯录不能为空' );
|
||||
}
|
||||
}else{
|
||||
return errorJson([],$response);
|
||||
if($isInner){
|
||||
return json_encode(['code'=>200,'msg'=> $response ]);
|
||||
}else{
|
||||
return successJson([],$response );
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
if($isInner){
|
||||
|
||||
@@ -37,4 +37,5 @@ return [
|
||||
'workbench:trafficDistribute' => 'app\command\WorkbenchTrafficDistributeCommand', // 工作台流量分发任务
|
||||
'workbench:groupPush' => 'app\command\WorkbenchGroupPushCommand', // 工作台群推送任务
|
||||
'workbench:groupCreate' => 'app\command\WorkbenchGroupCreateCommand', // 工作台群创建任务
|
||||
'workbench:import-contact' => 'app\command\WorkbenchImportContactCommand', // 工作台通讯录导入任务
|
||||
];
|
||||
|
||||
125
Server/application/command/WorkbenchImportContactCommand.php
Normal file
125
Server/application/command/WorkbenchImportContactCommand.php
Normal file
@@ -0,0 +1,125 @@
|
||||
<?php
|
||||
|
||||
namespace app\command;
|
||||
|
||||
use app\job\WorkbenchImportContactJob;
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\facade\Log;
|
||||
use think\facade\Cache;
|
||||
use think\Queue;
|
||||
|
||||
/**
|
||||
* 工作台通讯录导入命令
|
||||
* Class WorkbenchImportContactCommand
|
||||
* @package app\command
|
||||
*/
|
||||
class WorkbenchImportContactCommand extends Command
|
||||
{
|
||||
/**
|
||||
* 配置命令
|
||||
*/
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('workbench:import-contact')
|
||||
->setDescription('执行工作台通讯录导入任务');
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行命令
|
||||
* @param Input $input
|
||||
* @param Output $output
|
||||
* @return int
|
||||
*/
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$output->writeln('开始执行工作台通讯录导入任务...');
|
||||
|
||||
try {
|
||||
// 检查是否有任务正在执行
|
||||
$lockKey = 'workbench_import_contact_lock';
|
||||
if (Cache::has($lockKey)) {
|
||||
$output->writeln('通讯录导入任务正在执行中,跳过本次执行');
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 设置执行锁,防止重复执行
|
||||
Cache::set($lockKey, time(), 3600); // 1小时锁定时间
|
||||
|
||||
// 生成任务ID
|
||||
$jobId = 'workbench_import_contact_' . date('YmdHis') . '_' . mt_rand(1000, 9999);
|
||||
|
||||
// 准备任务数据
|
||||
$jobData = [
|
||||
'jobId' => $jobId,
|
||||
'queueLockKey' => $lockKey,
|
||||
'executeTime' => time()
|
||||
];
|
||||
// 判断是否使用队列
|
||||
if ($this->shouldUseQueue()) {
|
||||
// 推送到队列
|
||||
Queue::push(WorkbenchImportContactJob::class, $jobData, 'workbench_import_contact');
|
||||
$output->writeln("通讯录导入任务已推送到队列,任务ID: {$jobId}");
|
||||
} else {
|
||||
// 直接执行
|
||||
$job = new WorkbenchImportContactJob();
|
||||
$result = $job->execute();
|
||||
|
||||
// 释放锁
|
||||
Cache::rm($lockKey);
|
||||
|
||||
if ($result !== false) {
|
||||
$output->writeln('通讯录导入任务执行成功');
|
||||
} else {
|
||||
$output->writeln('通讯录导入任务执行失败');
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (\Exception $e) {
|
||||
// 释放锁
|
||||
Cache::rm($lockKey ?? '');
|
||||
|
||||
$errorMsg = '通讯录导入任务执行异常: ' . $e->getMessage();
|
||||
$output->writeln($errorMsg);
|
||||
Log::error($errorMsg);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否应该使用队列
|
||||
* @return bool
|
||||
*/
|
||||
protected function shouldUseQueue()
|
||||
{
|
||||
// 检查队列配置是否启用
|
||||
$queueConfig = config('queue');
|
||||
if (empty($queueConfig) || !isset($queueConfig['default'])) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查队列连接是否可用
|
||||
try {
|
||||
$connection = $queueConfig['connections'][$queueConfig['default']] ?? [];
|
||||
if (empty($connection)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 如果是数据库队列,检查表是否存在
|
||||
if ($connection['type'] === 'database') {
|
||||
$tableName = $connection['table'] ?? 'jobs';
|
||||
$exists = \think\Db::query("SHOW TABLES LIKE '{$tableName}'");
|
||||
return !empty($exists);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (\Exception $e) {
|
||||
Log::warning('队列检查失败,将使用同步执行: ' . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
399
Server/application/job/WorkbenchImportContactJob.php
Normal file
399
Server/application/job/WorkbenchImportContactJob.php
Normal file
@@ -0,0 +1,399 @@
|
||||
<?php
|
||||
|
||||
namespace app\job;
|
||||
|
||||
use app\api\controller\DeviceController;
|
||||
use app\cunkebao\model\Workbench;
|
||||
use app\cunkebao\model\WorkbenchImportContact;
|
||||
use think\facade\Log;
|
||||
use think\facade\Env;
|
||||
use think\Db;
|
||||
use think\queue\Job;
|
||||
use think\facade\Cache;
|
||||
use think\facade\Config;
|
||||
|
||||
/**
|
||||
* 工作台通讯录导入任务
|
||||
* Class WorkbenchImportContactJob
|
||||
* @package app\job
|
||||
*/
|
||||
class WorkbenchImportContactJob
|
||||
{
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
const MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* 队列任务处理
|
||||
* @param Job $job 队列任务
|
||||
* @param array $data 任务数据
|
||||
* @return bool
|
||||
*/
|
||||
public function fire(Job $job, $data)
|
||||
{
|
||||
$jobId = $data['jobId'] ?? '';
|
||||
$queueLockKey = $data['queueLockKey'] ?? '';
|
||||
try {
|
||||
$this->logJobStart($jobId, $queueLockKey);
|
||||
$this->execute();
|
||||
$this->handleJobSuccess($job, $queueLockKey);
|
||||
return true;
|
||||
} catch (\Exception $e) {
|
||||
return $this->handleJobError($e, $job, $queueLockKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function execute()
|
||||
{
|
||||
try {
|
||||
// 获取所有启用的通讯录导入工作台
|
||||
$workbenches = Workbench::where(['status' => 1, 'type' => 6, 'isDel' => 0])->order('id desc')->select();
|
||||
foreach ($workbenches as $workbench) {
|
||||
// 获取工作台配置
|
||||
$config = WorkbenchImportContact::where('workbenchId', $workbench->id)->find();
|
||||
if (!$config) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 判断是否需要导入
|
||||
$shouldImport = $this->shouldImport($workbench, $config);
|
||||
if (!$shouldImport) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取需要导入的设备列表
|
||||
$devices = $this->getDeviceList($workbench, $config);
|
||||
if (empty($devices)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取通讯录数据
|
||||
$contactData = $this->getContactFromDatabase($workbench, $config);
|
||||
if (empty($contactData)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 执行通讯录导入
|
||||
$this->importContactToDevices($workbench, $config, $devices, $contactData);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error("通讯录导入任务异常: " . $e->getMessage());
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 导入通讯录到设备
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchImportContact $config
|
||||
* @param array $devices
|
||||
* @param array $contactData
|
||||
*/
|
||||
public function importContactToDevices($workbench, $config, $devices, $contactData)
|
||||
{
|
||||
$deviceController = new DeviceController();
|
||||
|
||||
// 根据设备数量平分通讯录数据
|
||||
$deviceCount = count($devices);
|
||||
if ($deviceCount == 0) {
|
||||
Log::warning("没有可用设备进行通讯录导入");
|
||||
return;
|
||||
}
|
||||
|
||||
$contactCount = count($contactData);
|
||||
if ($contactCount == 0) {
|
||||
Log::warning("没有通讯录数据需要导入");
|
||||
return;
|
||||
}
|
||||
|
||||
// 计算每个设备分配的联系人数量
|
||||
$contactsPerDevice = ceil($contactCount / $deviceCount);
|
||||
foreach ($devices as $index => $device) {
|
||||
try {
|
||||
// 计算当前设备的联系人数据范围
|
||||
$startIndex = $index * $contactsPerDevice;
|
||||
$endIndex = min($startIndex + $contactsPerDevice, $contactCount);
|
||||
|
||||
// 如果起始索引超出范围,跳过
|
||||
if ($startIndex >= $contactCount) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取当前设备的联系人数据片段
|
||||
$deviceContactData = array_slice($contactData, $startIndex, $endIndex - $startIndex);
|
||||
|
||||
if (empty($deviceContactData)) {
|
||||
continue;
|
||||
}
|
||||
// 准备联系人数据
|
||||
$contactJson = $this->formatContactData($deviceContactData, $config);
|
||||
|
||||
// 调用设备控制器的导入联系人方法
|
||||
$result = $deviceController->importContact([
|
||||
'deviceId' => $device['deviceId'],
|
||||
'contactJson' => $contactJson,
|
||||
'clearContact' => $config['clearContact'] ?? false
|
||||
], true);
|
||||
|
||||
$resultData = json_decode($result, true);
|
||||
|
||||
// 记录导入历史
|
||||
$this->recordImportHistory($workbench, $device, $deviceContactData);
|
||||
|
||||
if ($resultData['code'] == 200) {
|
||||
Log::info("设备 {$device['deviceId']} 通讯录导入成功,导入联系人数量: " . count($deviceContactData));
|
||||
} else {
|
||||
Log::error("设备 {$device['deviceId']} 通讯录导入失败: " . ($resultData['msg'] ?? '未知错误'));
|
||||
}
|
||||
|
||||
// 添加延迟,避免频繁请求
|
||||
if ($config['importInterval'] ?? 0 > 0) {
|
||||
sleep($config['importInterval']);
|
||||
}
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error("设备 {$device['deviceId']} 通讯录导入异常: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 格式化联系人数据
|
||||
* @param array $contactData
|
||||
* @param WorkbenchImportContact $config
|
||||
* @return array|string
|
||||
*/
|
||||
protected function formatContactData($contactData, $config)
|
||||
{
|
||||
$remarkType = $config['remarkType'] ?? 0;
|
||||
$remark = $config['remark'] ?? '';
|
||||
|
||||
// 根据remarkType添加备注
|
||||
$suffix = '';
|
||||
switch ($remarkType) {
|
||||
case 0:
|
||||
// 不添加备注
|
||||
$suffix = '';
|
||||
break;
|
||||
case 1:
|
||||
// 添加年月日
|
||||
$suffix = date('Ymd') . '_';
|
||||
break;
|
||||
case 2:
|
||||
// 添加月日
|
||||
$suffix = date('md') . '_';
|
||||
break;
|
||||
case 3:
|
||||
// 自定义备注
|
||||
$suffix = $remark . '_';
|
||||
break;
|
||||
default:
|
||||
$suffix = '';
|
||||
break;
|
||||
}
|
||||
// 返回数组格式
|
||||
$contacts = [];
|
||||
foreach ($contactData as $contact) {
|
||||
$name = !empty($contact['name']) ? trim($contact['name']) : trim($contact['phone']);
|
||||
if (!empty($suffix)) {
|
||||
$name = $suffix . $name;
|
||||
}
|
||||
$contacts[] = [
|
||||
'name' => $name,
|
||||
'phone' => trim($contact['phone'])
|
||||
];
|
||||
}
|
||||
return $contacts;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录导入历史
|
||||
* @param Workbench $workbench
|
||||
* @param array $device
|
||||
* @param array $contactData
|
||||
* @param array $result
|
||||
*/
|
||||
protected function recordImportHistory($workbench, $device, $contactData)
|
||||
{
|
||||
$data = [];
|
||||
foreach ($contactData as $v){
|
||||
$data[] = [
|
||||
'workbenchId' => $workbench->id,
|
||||
'deviceId' => $device['deviceId'],
|
||||
'packageId' => !empty($v['packageId']) ? $v['packageId'] : 0,
|
||||
'poolId' => !empty($v['id']) ? $v['id'] : 0,
|
||||
'createTime' => time(),
|
||||
];
|
||||
}
|
||||
Db::name('workbench_import_contact_item')->insertAll($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备列表
|
||||
* @param Workbench $workbench 工作台
|
||||
* @param WorkbenchImportContact $config 配置
|
||||
* @return array
|
||||
*/
|
||||
protected function getDeviceList($workbench, $config)
|
||||
{
|
||||
$deviceIds = json_decode($config['deviceId'], true);
|
||||
if (empty($deviceIds)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// 从数据库获取设备信息
|
||||
$devices = Db::table('s2_device')
|
||||
->whereIn('id', $deviceIds)
|
||||
->where('isDeleted', 0)
|
||||
->where('alive', 1) // 只选择在线设备
|
||||
->field('id as deviceId, imei, nickname')
|
||||
->select();
|
||||
|
||||
return $devices;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否需要导入
|
||||
* @param Workbench $workbench 工作台
|
||||
* @param WorkbenchImportContact $config 配置
|
||||
* @return bool
|
||||
*/
|
||||
protected function shouldImport($workbench, $config)
|
||||
{
|
||||
// 检查导入间隔
|
||||
$today = date('Y-m-d');
|
||||
$startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00');
|
||||
$endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00');
|
||||
// 如果不在指定时间范围内,则跳过
|
||||
if ($startTimestamp > time() || $endTimestamp < time()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$maxPerDay = $config['num'];
|
||||
if ($maxPerDay <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 查询今日已导入次数
|
||||
$count = Db::name('workbench_import_contact_item')
|
||||
->where('workbenchId', $workbench->id)
|
||||
->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp])
|
||||
->count();
|
||||
|
||||
if ($count >= $maxPerDay) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 计算导入间隔
|
||||
$totalSeconds = $endTimestamp - $startTimestamp;
|
||||
$interval = floor($totalSeconds / $maxPerDay);
|
||||
$nextImportTime = $startTimestamp + $count * $interval;
|
||||
|
||||
if (time() < $nextImportTime) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 从数据库读取通讯录
|
||||
* @param WorkbenchImportContact $config
|
||||
* @return array
|
||||
*/
|
||||
protected function getContactFromDatabase($workbench,$config)
|
||||
{
|
||||
$pools = json_decode($config['pools'], true);
|
||||
$deviceIds = json_decode($config['deviceId'], true);
|
||||
if (empty($pools) || empty($deviceIds)) {
|
||||
return false;
|
||||
}
|
||||
$deviceNum = count($deviceIds);
|
||||
$contactNum = $deviceNum * $config['num'];
|
||||
if (empty($contactNum)) {
|
||||
return false;
|
||||
}
|
||||
//过滤已删除的数据
|
||||
$packageIds = Db::name('traffic_source_package')
|
||||
->where(['isDel' => 0])
|
||||
->whereIn('id', $pools)
|
||||
->column('id');
|
||||
|
||||
if (empty($packageIds)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$data = Db::name('traffic_source_package_item')->alias('tpi')
|
||||
->join('traffic_pool tp', 'tp.identifier = tpi.identifier')
|
||||
->join('traffic_source ts', 'ts.identifier = tpi.identifier','left')
|
||||
->join('workbench_import_contact_item wici', 'wici.poolId = tp.id AND wici.workbenchId = '.$workbench->id,'left')
|
||||
->where('tp.mobile', '>',0)
|
||||
->where('wici.id','null')
|
||||
->whereIn('tpi.packageId',$packageIds)
|
||||
->field('tp.id,tpi.packageId,tp.mobile as phone,ts.name')
|
||||
->order('tp.id DESC')
|
||||
->group('tpi.identifier')
|
||||
->limit($contactNum)
|
||||
->select();
|
||||
return $data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录任务开始
|
||||
* @param string $jobId
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function logJobStart($jobId, $queueLockKey)
|
||||
{
|
||||
Log::info('开始处理工作台通讯录导入任务: ' . json_encode([
|
||||
'jobId' => $jobId,
|
||||
'queueLockKey' => $queueLockKey
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务成功
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function handleJobSuccess($job, $queueLockKey)
|
||||
{
|
||||
$job->delete();
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info('工作台通讯录导入任务执行成功');
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务错误
|
||||
* @param \Exception $e
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
* @return bool
|
||||
*/
|
||||
protected function handleJobError(\Exception $e, $job, $queueLockKey)
|
||||
{
|
||||
Log::error('工作台通讯录导入任务异常:' . $e->getMessage());
|
||||
|
||||
if (!empty($queueLockKey)) {
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info("由于异常释放队列锁: {$queueLockKey}");
|
||||
}
|
||||
|
||||
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
|
||||
$job->delete();
|
||||
} else {
|
||||
$job->release(Config::get('queue.failed_delay', 10));
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user