update task
This commit is contained in:
@@ -46,171 +46,22 @@ class TaskServer extends Server
|
||||
|
||||
$adapter = new ChuKeBaoAdapter();
|
||||
|
||||
|
||||
// 只在一个进程里开这个定时器,处理指定任务
|
||||
// 在一个进程里处理获客任务添加后的相关逻辑
|
||||
if ($current_worker_id == self::PROCESS_COUNT - 1) {
|
||||
|
||||
// todo 封装为 handleFriendAddTaskWithStatusIsCreated() ; 重复代码进一步抽象
|
||||
Timer::add(60, function () use($adapter) {
|
||||
|
||||
$tasks = Db::name('task_customer')
|
||||
->where('status', 1)
|
||||
->limit(50)
|
||||
->select();
|
||||
|
||||
|
||||
if ($tasks) {
|
||||
foreach ($tasks as $task) {
|
||||
|
||||
$task_id = $task['task_id'];
|
||||
|
||||
$task_info = $adapter->getCustomerAcquisitionTask($task_id);
|
||||
|
||||
if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (empty($task['processed_wechat_ids'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$weChatIds = explode(',', $task['processed_wechat_ids']);
|
||||
|
||||
$passedWeChatId = '';
|
||||
|
||||
foreach ($weChatIds as $wechatId) {
|
||||
|
||||
// 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3
|
||||
// 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?)
|
||||
if ($adapter->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) {
|
||||
$passedWeChatId = $wechatId;
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if ($passedWeChatId && !empty($task_info['msgConf'])) {
|
||||
|
||||
// 直接发消息,同时更新状态为 4(已通过-已发消息)
|
||||
$wechatFriendRecord = $adapter->getWeChatAccoutIdAndFriendIdByWeChatIdAndFriendPhone($passedWeChatId, $task['phone']);
|
||||
|
||||
$msgConf = is_string($task_info['msgConf']) ? json_decode($task_info['msgConf'], 1) : $task_info['msgConf'];
|
||||
|
||||
$wechatFriendRecord && $adapter->sendMsgToFriend($wechatFriendRecord['id'], $wechatFriendRecord['wechatAccountId'], $msgConf);
|
||||
|
||||
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 4, 'updated_at' => time()]);
|
||||
|
||||
} else {
|
||||
|
||||
foreach ($weChatIds as $wechatId) {
|
||||
|
||||
// 查询执行状态
|
||||
$latestFriendTask = $adapter->getLatestFriendTaskByPhoneAndWeChatId($task['phone'], $wechatId);
|
||||
if (empty($latestFriendTask)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
// 已经执行成功的话,直接break,同时更新对应task_customer的状态为2(添加成功)
|
||||
if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 1) {
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 2, 'updated_at' => time()]);
|
||||
break;
|
||||
}
|
||||
|
||||
// todo 判断处理执行失败的情况 status=2,根据 extra 的描述去处理;-- 可以先直接更新为失败,然后 extra =》fail_reason -- 因为有专门的任务会处理失败的
|
||||
if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 2) {
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 3, 'fail_reason' => $latestFriendTask['extra'] ?? '未知原因', 'updated_at' => time()]);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
$adapter->handleCustomerTaskWithStatusIsCreated();
|
||||
});
|
||||
}
|
||||
|
||||
// 3个进程处理获客新任务
|
||||
if ($current_worker_id < self::PROCESS_COUNT - 1) {
|
||||
|
||||
Timer::add(1, function () use ($current_worker_id, $process_count_for_status_0, $adapter) {
|
||||
|
||||
$tasks = Db::name('task_customer')
|
||||
->where('status', 0)
|
||||
->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}")
|
||||
->limit(50)
|
||||
->select();
|
||||
if ($tasks) {
|
||||
|
||||
foreach ($tasks as $task) {
|
||||
|
||||
$task_id = $task['task_id'];
|
||||
|
||||
$task_info = $adapter->getCustomerAcquisitionTask($task_id);
|
||||
|
||||
if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
$wechatIdAccountIdMap = $adapter->getWeChatIdsAccountIdsMapByDeviceIds($task_info['reqConf']['devices']);
|
||||
if (empty($wechatIdAccountIdMap)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$friendAddTaskCreated = false;
|
||||
|
||||
foreach ($wechatIdAccountIdMap as $wechatId => $accountId) {
|
||||
|
||||
|
||||
// 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑
|
||||
if ($adapter->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) {
|
||||
$task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号
|
||||
break;
|
||||
}
|
||||
|
||||
// 判断时间间隔\时间段和最后一次的状态
|
||||
$canCreateFriendAddTask = $adapter->checkIfCanCreateFriendAddTask($wechatId, $task_info['reqConf']);
|
||||
if (empty($canCreateFriendAddTask)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 判断24h内加的好友数量,friend_task 先固定10个人 getLast24hAddedFriendsCount
|
||||
$last24hAddedFriendsCount = $adapter->getLast24hAddedFriendsCount($wechatId);
|
||||
if ($last24hAddedFriendsCount >= 10) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断
|
||||
|
||||
// 创建好友添加任务, 对接触客宝
|
||||
$conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name']]);
|
||||
$adapter->createFriendAddTask($accountId, $task['phone'], $conf);
|
||||
$friendAddTaskCreated = true;
|
||||
$task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => $friendAddTaskCreated ? 1 : 3, 'fail_reason' => $friendAddTaskCreated ? '' : '所有账号不可添加', 'updated_at' => time()]); // ~~不用管,回头再添加再判断即可~~
|
||||
// 失败一定是另一个进程/定时器在检查的
|
||||
|
||||
}
|
||||
}
|
||||
$adapter->handleCustomerTaskWithStatusIsNew($current_worker_id, $process_count_for_status_0);
|
||||
});
|
||||
}
|
||||
|
||||
// 更多其他后台任务
|
||||
// ......
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use think\facade\Log;
|
||||
use app\api\controller\FriendTaskController;
|
||||
use app\common\service\AuthService;
|
||||
use app\api\controller\WebSocketController;
|
||||
use Workerman\Lib\Timer;
|
||||
|
||||
class Adapter implements WeChatServiceInterface
|
||||
{
|
||||
@@ -134,6 +135,7 @@ class Adapter implements WeChatServiceInterface
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 发送微信朋友圈
|
||||
* @param string $deviceId 设备ID
|
||||
@@ -147,10 +149,161 @@ class Adapter implements WeChatServiceInterface
|
||||
return true;
|
||||
}
|
||||
|
||||
// sendMsgToFriend 要处理计划任务
|
||||
public function handleCustomerTaskWithStatusIsNew(int $current_worker_id, int $process_count_for_status_0)
|
||||
{
|
||||
$tasks = Db::name('task_customer')
|
||||
->where('status', 0)
|
||||
->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}")
|
||||
->limit(50)
|
||||
->select();
|
||||
if ($tasks) {
|
||||
|
||||
foreach ($tasks as $task) {
|
||||
|
||||
$task_id = $task['task_id'];
|
||||
|
||||
$task_info = $this->getCustomerAcquisitionTask($task_id);
|
||||
|
||||
if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
$wechatIdAccountIdMap = $this->getWeChatIdsAccountIdsMapByDeviceIds($task_info['reqConf']['devices']);
|
||||
if (empty($wechatIdAccountIdMap)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$friendAddTaskCreated = false;
|
||||
|
||||
foreach ($wechatIdAccountIdMap as $wechatId => $accountId) {
|
||||
|
||||
|
||||
// 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑
|
||||
if ($this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) {
|
||||
$task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号
|
||||
break;
|
||||
}
|
||||
|
||||
// 判断时间间隔\时间段和最后一次的状态
|
||||
$canCreateFriendAddTask = $this->checkIfCanCreateFriendAddTask($wechatId, $task_info['reqConf']);
|
||||
if (empty($canCreateFriendAddTask)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 判断24h内加的好友数量,friend_task 先固定10个人 getLast24hAddedFriendsCount
|
||||
$last24hAddedFriendsCount = $this->getLast24hAddedFriendsCount($wechatId);
|
||||
if ($last24hAddedFriendsCount >= 10) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断
|
||||
|
||||
// 创建好友添加任务, 对接触客宝
|
||||
$conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name']]);
|
||||
$this->createFriendAddTask($accountId, $task['phone'], $conf);
|
||||
$friendAddTaskCreated = true;
|
||||
$task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => $friendAddTaskCreated ? 1 : 3, 'fail_reason' => $friendAddTaskCreated ? '' : '所有账号不可添加', 'updated_at' => time()]); // ~~不用管,回头再添加再判断即可~~
|
||||
// 失败一定是另一个进程/定时器在检查的
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 处理添加中的获客任务, only run in workerman process!
|
||||
public function handleCustomerTaskWithStatusIsCreated()
|
||||
{
|
||||
|
||||
$tasks = Db::name('task_customer')
|
||||
->where('status', 1)
|
||||
->limit(50)
|
||||
->select();
|
||||
|
||||
if (empty($tasks)) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($tasks as $task) {
|
||||
|
||||
$task_id = $task['task_id'];
|
||||
|
||||
$task_info = $this->getCustomerAcquisitionTask($task_id);
|
||||
|
||||
if (empty($task_info['status']) || empty($task_info['reqConf']) || empty($task_info['reqConf']['devices'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (empty($task['processed_wechat_ids'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$weChatIds = explode(',', $task['processed_wechat_ids']);
|
||||
|
||||
$passedWeChatId = '';
|
||||
|
||||
foreach ($weChatIds as $wechatId) {
|
||||
|
||||
// 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3
|
||||
// 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?)
|
||||
if ($this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'])) {
|
||||
$passedWeChatId = $wechatId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ($passedWeChatId && !empty($task_info['msgConf'])) {
|
||||
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 4, 'updated_at' => time()]);
|
||||
|
||||
$wechatFriendRecord = $this->getWeChatAccoutIdAndFriendIdByWeChatIdAndFriendPhone($passedWeChatId, $task['phone']);
|
||||
|
||||
$msgConf = is_string($task_info['msgConf']) ? json_decode($task_info['msgConf'], 1) : $task_info['msgConf'];
|
||||
|
||||
$wechatFriendRecord && $this->sendMsgToFriend($wechatFriendRecord['id'], $wechatFriendRecord['wechatAccountId'], $msgConf);
|
||||
|
||||
} else {
|
||||
|
||||
foreach ($weChatIds as $wechatId) {
|
||||
|
||||
// 查询执行状态
|
||||
$latestFriendTask = $this->getLatestFriendTaskByPhoneAndWeChatId($task['phone'], $wechatId);
|
||||
if (empty($latestFriendTask)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 已经执行成功的话,直接break,同时更新对应task_customer的状态为2(添加成功)
|
||||
if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 1) {
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 2, 'updated_at' => time()]);
|
||||
break;
|
||||
}
|
||||
|
||||
// todo 判断处理执行失败的情况 status=2,根据 extra 的描述去处理;-- 可以先直接更新为失败,然后 extra =》fail_reason -- 因为有专门的任务会处理失败的
|
||||
if (isset($latestFriendTask['status']) && $latestFriendTask['status'] == 2) {
|
||||
Db::name('task_customer')
|
||||
->where('id', $task['id'])
|
||||
->update(['status' => 3, 'fail_reason' => $latestFriendTask['extra'] ?? '未知原因', 'updated_at' => time()]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 发微信个人消息
|
||||
public function sendMsgToFriend(int $friendId, int $wechatAccountId, array $msgConf)
|
||||
{
|
||||
// todo 直接发消息,同时更新状态为 4(已通过-已发消息) application/api/controller/WebSocketController.php sendPersonal
|
||||
// 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件)
|
||||
// 当前,type 为文本、图片、动图表情包的时候,content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''}
|
||||
// $result = [
|
||||
@@ -162,18 +315,58 @@ class Adapter implements WeChatServiceInterface
|
||||
// "wechatChatroomId" => 0,
|
||||
// "wechatFriendId" => $dataArray['wechatFriendId'],
|
||||
// ];
|
||||
$wsController = new WebSocketController();
|
||||
$wsController->sendPersonal([
|
||||
'wechatFriendId' => $friendId,
|
||||
'wechatAccountId' => $wechatAccountId,
|
||||
'msgConf' => $msgConf,
|
||||
$wsController = new WebSocketController([
|
||||
'userName' => $this->config['username'],
|
||||
'password' => $this->config['password']
|
||||
]);
|
||||
|
||||
$gap = 0;
|
||||
foreach ($msgConf['content'] as $content) {
|
||||
$msgType = 0;
|
||||
|
||||
if ($content['type'] == 'text') {
|
||||
$msgType = 1;
|
||||
}
|
||||
if ($content['type'] == 'image') {
|
||||
$msgType = 3;
|
||||
}
|
||||
if ($content['type'] == 'sticker') {
|
||||
$msgType = 47;
|
||||
}
|
||||
if ($content['type'] == 'video') {
|
||||
$msgType = 43;
|
||||
}
|
||||
|
||||
if ($msgType == 0) {
|
||||
$msgType = 49;
|
||||
}
|
||||
|
||||
if ($gap) {
|
||||
Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content) {
|
||||
$wsController->sendPersonal([
|
||||
'wechatFriendId' => $friendId,
|
||||
'wechatAccountId' => $wechatAccountId,
|
||||
'msgType' => $msgType,
|
||||
'content' => $content['detail'],
|
||||
]);
|
||||
}, [], false);
|
||||
} else {
|
||||
$wsController->sendPersonal([
|
||||
'wechatFriendId' => $friendId,
|
||||
'wechatAccountId' => $wechatAccountId,
|
||||
'msgType' => $msgType,
|
||||
'content' => $content['detail'],
|
||||
]);
|
||||
}
|
||||
|
||||
!empty($msgConf['send_gap']) && $gap += $msgConf['send_gap'];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// getCustomerAcquisitionTask
|
||||
public function getCustomerAcquisitionTask($id) {
|
||||
public function getCustomerAcquisitionTask($id)
|
||||
{
|
||||
|
||||
// 先读取缓存
|
||||
$task_info = cache('task_info_' . $id);
|
||||
@@ -196,38 +389,19 @@ class Adapter implements WeChatServiceInterface
|
||||
public function checkIfIsWeChatFriendByPhone(string $wxId, string $phone): bool
|
||||
{
|
||||
if (empty($wxId) || empty($phone)) {
|
||||
// Avoid queries with empty essential parameters.
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// The SQL hint provided is:
|
||||
// SELECT ownerWechatId, phone, passTime, createTime
|
||||
// FROM `s2_wechat_friend`
|
||||
// WHERE ownerWechatId = '您的微信ID' -- Corresponds to $wxId
|
||||
// AND phone LIKE 'phone%' -- Corresponds to $phone . '%'
|
||||
// ORDER BY createTime DESC;
|
||||
|
||||
// $friendRecord = Db::table('s2_wechat_friend')
|
||||
// ->where('ownerWechatId', $wxId)
|
||||
// ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone
|
||||
// ->order('createTime', 'desc') // Order by creation time as hinted
|
||||
// ->find(); // Fetches the first matching record or null
|
||||
// $friendRecord = Db::table('s2_wechat_friend')
|
||||
$id = Db::table('s2_wechat_friend')
|
||||
->where('ownerWechatId', $wxId)
|
||||
->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone
|
||||
->order('createTime', 'desc') // Order by creation time as hinted
|
||||
// ->column('id');
|
||||
->where('phone', 'like', $phone . '%')
|
||||
->order('createTime', 'desc')
|
||||
->value('id');
|
||||
|
||||
// If a record is found, $friendRecord will not be empty.
|
||||
// return !empty($friendRecord);
|
||||
return (bool)$id;
|
||||
} catch (\Exception $e) {
|
||||
// Log the exception for diagnostics.
|
||||
Log::error("Error in checkIfIsWeChatFriendByPhone (wxId: {$wxId}, phone: {$phone}): " . $e->getMessage());
|
||||
// Return false in case of an error, indicating not a friend or unable to determine.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -240,11 +414,10 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
return Db::table('s2_wechat_friend')
|
||||
->where('ownerWechatId', $wechatId)
|
||||
->where('phone', 'like', $phone . '%')
|
||||
->field('id,wechatAccountId,passTime,createTime')
|
||||
->find();
|
||||
|
||||
->where('ownerWechatId', $wechatId)
|
||||
->where('phone', 'like', $phone . '%')
|
||||
->field('id,wechatAccountId,passTime,createTime')
|
||||
->find();
|
||||
}
|
||||
|
||||
// 判断是否已添加某手机号为好友并返回添加时间
|
||||
@@ -255,15 +428,10 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
try {
|
||||
// $passTime = Db::table('s2_wechat_friend')
|
||||
// ->where('ownerWechatId', $wxId)
|
||||
// ->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone
|
||||
// ->order('createTime', 'desc') // Order by creation time as hinted
|
||||
// ->value('passTime');
|
||||
$record = Db::table('s2_wechat_friend')
|
||||
->where('ownerWechatId', $wxId)
|
||||
->where('phone', 'like', $phone . '%') // Match phone numbers starting with $phone
|
||||
->field('id,createTime,passTime') // Order by creation time as hinted
|
||||
->where('phone', 'like', $phone . '%')
|
||||
->field('id,createTime,passTime')
|
||||
->find();
|
||||
|
||||
return $record['passTime'] ?? $record['createTime'] ?? 0;
|
||||
@@ -340,7 +508,7 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
}
|
||||
|
||||
// getLatestFriendTaskByPhoneAndWeChatId
|
||||
// 获取某微信最后一条添加好友任务
|
||||
public function getLatestFriendTaskByPhoneAndWeChatId(string $phone, string $wechatId): array
|
||||
{
|
||||
if (empty($phone) || empty($wechatId)) {
|
||||
@@ -384,13 +552,11 @@ class Adapter implements WeChatServiceInterface
|
||||
return false;
|
||||
}
|
||||
|
||||
// conf['allow_add_time_between']
|
||||
if (!empty($conf['allow_add_time_between']) && count($conf['allow_add_time_between']) == 2) {
|
||||
$currentTime = date('H:i');
|
||||
$startTime = $conf['allow_add_time_between'][0];
|
||||
$endTime = $conf['allow_add_time_between'][1];
|
||||
|
||||
// If current time is NOT between start and end time, return false
|
||||
|
||||
if ($currentTime >= $startTime && $currentTime <= $endTime) {
|
||||
return true;
|
||||
} else {
|
||||
@@ -399,36 +565,23 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
if (isset($record['status'])) {
|
||||
// if ($record['status'] == 1) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
if ($record['status'] == 2) {
|
||||
|
||||
// todo 判断$record['extra'] 是否包含文字: 操作过于频繁;如果包含判断 updateTime 是否已经超过72min,updateTime是10位时间戳;如果包含指定文字且时间未超过72min,return false
|
||||
// 判断$record['extra'] 是否包含文字: 操作过于频繁;如果包含判断 updateTime 是否已经超过72min,updateTime是10位时间戳;如果包含指定文字且时间未超过72min,return false
|
||||
if (isset($record['extra']) && strpos($record['extra'], '操作过于频繁') !== false) {
|
||||
$updateTime = isset($record['updateTime']) ? (int)$record['updateTime'] : 0;
|
||||
$now = time();
|
||||
$diff = $now - $updateTime;
|
||||
|
||||
// Check if less than 72 minutes (72 * 60 = 4320 seconds) have passed
|
||||
// if ($diff < 72 * 60) {
|
||||
|
||||
if ($diff < 24 * 60 * 60) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// $createTime = $record['createTime'];
|
||||
// $now = time();
|
||||
// $diff = $now - $createTime;
|
||||
// if ($diff > 10 * 60) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
|
||||
// 获取触客宝系统的客服微信账号id,用于后续微信相关操作
|
||||
@@ -454,16 +607,12 @@ class Adapter implements WeChatServiceInterface
|
||||
->where('deviceAlive', 1)
|
||||
->where('wechatAlive', 1)
|
||||
->where('wechatId', 'in', $wechatIds)
|
||||
// ->field('id')
|
||||
->field('id,wechatId')
|
||||
// ->select();
|
||||
->column('id', 'wechatId');
|
||||
|
||||
return $records;
|
||||
}
|
||||
|
||||
// getWeChatIdsByDeviceIds
|
||||
// public function getWeChatIdsByDeviceIds(array $deviceIds): array
|
||||
public function getWeChatIdsAccountIdsMapByDeviceIds(array $deviceIds): array
|
||||
{
|
||||
if (empty($deviceIds)) {
|
||||
@@ -472,16 +621,15 @@ class Adapter implements WeChatServiceInterface
|
||||
$records = Db::table('s2_wechat_account')
|
||||
->where('deviceAlive', 1)
|
||||
->where('currentDeviceId', 'in', $deviceIds)
|
||||
// ->field('id,wechatId,currentDeviceId')
|
||||
->field('id,wechatId')
|
||||
// ->select();
|
||||
->column('id,wechatId');
|
||||
return $records;
|
||||
}
|
||||
|
||||
// addFriendTaskApi
|
||||
public function addFriendTaskApi(int $wechatAccountId, string $phone, string $message, string $remark, array $labels, $authorization = '') {
|
||||
|
||||
// 触客宝添加好友API
|
||||
public function addFriendTaskApi(int $wechatAccountId, string $phone, string $message, string $remark, array $labels, $authorization = '')
|
||||
{
|
||||
|
||||
$authorization = $authorization ?: AuthService::getSystemAuthorization();
|
||||
|
||||
if (empty($authorization)) {
|
||||
@@ -492,10 +640,6 @@ class Adapter implements WeChatServiceInterface
|
||||
];
|
||||
}
|
||||
|
||||
// $friendTaskController = new FriendTaskController();
|
||||
// $friendTaskController->addFriendTask($wechatAccountId, $phone, $reqConf);
|
||||
|
||||
// todo 调用 application/api/controller/FriendTaskController.php: addFriendTask()
|
||||
$params = [
|
||||
'phone' => $phone,
|
||||
'message' => $message,
|
||||
@@ -513,31 +657,24 @@ class Adapter implements WeChatServiceInterface
|
||||
'Content-Type' => 'application/json',
|
||||
'client' => 'system'
|
||||
];
|
||||
// 如果有授权信息,添加到请求头
|
||||
// if (!empty($authorization)) {
|
||||
// $headers['Authorization'] = 'bearer ' . $authorization;
|
||||
// }
|
||||
|
||||
$headers['Authorization'] = 'bearer ' . $authorization;
|
||||
try {
|
||||
// 发送请求
|
||||
$response = $client->request('POST', 'api/AddFriendByPhoneTask/add', [
|
||||
'headers' => $headers,
|
||||
'json' => $params, // Guzzle 会自动将数组转换为 JSON
|
||||
'json' => $params,
|
||||
]);
|
||||
|
||||
// 获取状态码
|
||||
|
||||
$statusCode = $response->getStatusCode();
|
||||
|
||||
// 获取响应体并解析 JSON
|
||||
|
||||
$body = $response->getBody()->getContents();
|
||||
$result = json_decode($body, true);
|
||||
|
||||
|
||||
// 返回结果,包含状态码和响应体
|
||||
return [
|
||||
'status_code' => $statusCode,
|
||||
'body' => $result
|
||||
];
|
||||
|
||||
} catch (RequestException $e) {
|
||||
// 处理请求异常,可以获取错误响应
|
||||
if ($e->hasResponse()) {
|
||||
@@ -550,7 +687,7 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
$body = $e->getResponse()->getBody()->getContents();
|
||||
$result = json_decode($body, true);
|
||||
|
||||
|
||||
return [
|
||||
'status_code' => $statusCode,
|
||||
'body' => $result,
|
||||
@@ -559,7 +696,7 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
Log::error("Error in addFriendTaskApi (wechatAccountId: {$wechatAccountId}, phone: {$phone}, message: {$message}, remark: {$remark}, labels: " . json_encode($labels) . "): " . $e->getMessage());
|
||||
|
||||
|
||||
// 没有响应的异常
|
||||
return [
|
||||
'status_code' => 0,
|
||||
@@ -578,52 +715,34 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
}
|
||||
|
||||
// createFriendAddTask $accountId, $task['phone'], $task_info['reqConf'] -hello_msg,remark_type
|
||||
// public function createFriendAddTask(int $wechatAccountId, string $phone, array $conf): bool
|
||||
// 创建添加好友任务/执行添加
|
||||
public function createFriendAddTask(int $wechatAccountId, string $phone, array $conf)
|
||||
{
|
||||
if (empty($wechatAccountId) || empty($phone) || empty($conf)) {
|
||||
// return false;
|
||||
return;
|
||||
}
|
||||
|
||||
// $remark = '';
|
||||
// if (isset($conf['remark_type']) && $conf['remark_type'] == 'phone') {
|
||||
// $remark = $phone . '-' . $conf['task_name'] ?? '获客';
|
||||
// } else {
|
||||
|
||||
// }
|
||||
$remark = $phone . '-' . $conf['task_name'] ?? '获客';
|
||||
|
||||
$tags = [];
|
||||
if (!empty($conf['tags'])) {
|
||||
if (!empty($conf['tags'])) {
|
||||
if (is_array($conf['tags'])) {
|
||||
$tags = $conf['tags'];
|
||||
}
|
||||
}
|
||||
|
||||
if (strpos($conf['tags'], ',') !== false) {
|
||||
$tags = explode(',', $conf['tags']);
|
||||
}
|
||||
}
|
||||
|
||||
// $res = $this->addFriendTaskApi($wechatAccountId, $phone, $conf['hello_msg'] ?? '你好', $remark, $conf['tags'] ?? []);
|
||||
$this->addFriendTaskApi($wechatAccountId, $phone, $conf['hello_msg'] ?? '你好', $remark, $tags);
|
||||
|
||||
// if ($res['status_code']) {
|
||||
|
||||
// }
|
||||
|
||||
|
||||
|
||||
// return true;
|
||||
|
||||
}
|
||||
|
||||
/* TODO: 以上方法待实现,基于/参考 application/api/controller/WebSocketController.php 去实现;以下同步脚本用的方法转移到其他类 */
|
||||
|
||||
|
||||
// NOTE: run in background; 5min 同步一次
|
||||
// todo: 后续经过`s2_`表,直接对接三方的api去sync
|
||||
public function syncFriendship()
|
||||
{
|
||||
$sql = "INSERT INTO ck_wechat_friendship(id,wechatId,tags,memo,ownerWechatId,createTime,updateTime,deleteTime,companyId)
|
||||
@@ -742,25 +861,19 @@ class Adapter implements WeChatServiceInterface
|
||||
->alias('a')
|
||||
->join(['s2_device' => 'd'], 'd.imei = a.imei')
|
||||
->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId')
|
||||
// ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
|
||||
->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as updateTime')
|
||||
->cursor();
|
||||
|
||||
// $insertData = [];
|
||||
// $batchSize = 500; // Insert in batches for better performance
|
||||
|
||||
foreach ($cursor as $item) {
|
||||
|
||||
if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['companyId'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// $exists = Db::connect()->table('ck_device_wechat_login')
|
||||
$exists = Db::table('ck_device_wechat_login')
|
||||
->where('deviceId', $item['deviceId'])
|
||||
->where('wechatId', $item['wechatId'])
|
||||
->where('companyId', $item['companyId'])
|
||||
// ->where('createTime', $item['createTime'])
|
||||
->find();
|
||||
|
||||
if ($exists) {
|
||||
@@ -774,20 +887,8 @@ class Adapter implements WeChatServiceInterface
|
||||
Db::table('ck_device_wechat_login')->insert($item);
|
||||
}
|
||||
|
||||
// $insertData[] = $item;
|
||||
|
||||
// if (count($insertData) >= $batchSize) {
|
||||
// Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE
|
||||
// $insertData = []; // Reset for next batch
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
// Insert any remaining data
|
||||
// if (!empty($insertData)) {
|
||||
// Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE
|
||||
// }
|
||||
|
||||
return true;
|
||||
} catch (\Exception $e) {
|
||||
Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
|
||||
@@ -851,7 +952,6 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 同步/更新微信客服信息到ck_wechat_customer表
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user