场景获客及时获取群好友
This commit is contained in:
@@ -6,6 +6,7 @@ use think\facade\Cache;
|
||||
use think\facade\Env;
|
||||
use WeChatDeviceApi\Contracts\WeChatServiceInterface;
|
||||
use WeChatDeviceApi\Exceptions\ApiException;
|
||||
|
||||
// 如果有 Client.php
|
||||
// use WeChatDeviceApi\Adapters\ChuKeBao\Client as ChuKeBaoApiClient;
|
||||
use GuzzleHttp\Client;
|
||||
@@ -23,6 +24,7 @@ use Workerman\Lib\Timer;
|
||||
class Adapter implements WeChatServiceInterface
|
||||
{
|
||||
protected $config;
|
||||
|
||||
// protected $apiClient; // 如果使用 VendorAApiClient
|
||||
|
||||
public function __construct(array $config = [])
|
||||
@@ -154,8 +156,8 @@ class Adapter implements WeChatServiceInterface
|
||||
public function handleCustomerTaskWithStatusIsNew(int $current_worker_id, int $process_count_for_status_0)
|
||||
{
|
||||
$task = Db::name('customer_acquisition_task')
|
||||
->where(['status' => 1,'deleteTime' => 0])
|
||||
// ->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}")
|
||||
->where(['status' => 1, 'deleteTime' => 0])
|
||||
->whereRaw("id % $process_count_for_status_0 = {$current_worker_id}")
|
||||
->order('id desc')
|
||||
->select();
|
||||
|
||||
@@ -168,11 +170,11 @@ class Adapter implements WeChatServiceInterface
|
||||
$reqConf = json_decode($item['reqConf'], true);
|
||||
$device = $reqConf['device'] ?? [];
|
||||
$deviceCount = count($device);
|
||||
if ($deviceCount <= 0){
|
||||
if ($deviceCount <= 0) {
|
||||
continue;
|
||||
}
|
||||
$tasks = Db::name('task_customer')
|
||||
->where(['status'=> 0,'task_id'=>$item['id']])
|
||||
->where(['status' => 0, 'task_id' => $item['id']])
|
||||
->order('id DESC')
|
||||
->limit($deviceCount)
|
||||
->select();
|
||||
@@ -198,7 +200,7 @@ class Adapter implements WeChatServiceInterface
|
||||
foreach ($wechatIdAccountIdMap as $accountId => $wechatId) {
|
||||
// 是否已经是好友的判断,如果已经是好友,直接break; 但状态还是维持1,让另外一个进程处理发消息的逻辑
|
||||
$wechatTags = json_decode($task['tags'], true);
|
||||
$isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'],$task['siteTags']);
|
||||
$isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone'], $task['siteTags']);
|
||||
if (!empty($isFriend)) {
|
||||
$friendAddTaskCreated = true;
|
||||
$task['processed_wechat_ids'] = $task['processed_wechat_ids'] . ',' . $wechatId; // 处理失败任务用,用于过滤已处理的微信号
|
||||
@@ -220,13 +222,13 @@ class Adapter implements WeChatServiceInterface
|
||||
// 采取乐观尝试的策略,假设第一个可以添加的人可以添加成功的; 回头再另外一个任务进程去判断
|
||||
|
||||
// 创建好友添加任务, 对接触客宝
|
||||
$tags = array_merge($task_info['tagConf']['customTags'],$task_info['tagConf']['scenarioTags']);
|
||||
if (!empty($wechatTags)){
|
||||
$tags = array_merge($tags,$wechatTags);
|
||||
$tags = array_merge($task_info['tagConf']['customTags'], $task_info['tagConf']['scenarioTags']);
|
||||
if (!empty($wechatTags)) {
|
||||
$tags = array_merge($tags, $wechatTags);
|
||||
}
|
||||
$tags = array_unique($tags);
|
||||
$tags = array_values($tags);
|
||||
$conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name'],'tags' => $tags]);
|
||||
$conf = array_merge($task_info['reqConf'], ['task_name' => $task_info['name'], 'tags' => $tags]);
|
||||
|
||||
|
||||
$this->createFriendAddTask($accountId, $task['phone'], $conf);
|
||||
@@ -249,13 +251,12 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 处理添加中的获客任务, only run in workerman process!
|
||||
public function handleCustomerTaskWithStatusIsCreated()
|
||||
{
|
||||
|
||||
$tasks = Db::name('task_customer')
|
||||
->whereIn('status', [1,2])
|
||||
->whereIn('status', [1, 2])
|
||||
->where('updateTime', '>=', (time() - 86400 * 3))
|
||||
->limit(50)
|
||||
->order('updateTime DESC')
|
||||
@@ -280,20 +281,19 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
$weChatIds = explode(',', $task['processed_wechat_ids']);
|
||||
$passedWeChatId = '';
|
||||
|
||||
foreach ($weChatIds as $wechatId) {
|
||||
|
||||
// 先是否是好友,如果不是好友,先查询执行状态,看是否还能以及需要换账号继续添加,还是直接更新状态为3
|
||||
// 如果添加成功,先更新为2,然后去发消息(先判断有无消息设置,发消息的log记录?)
|
||||
$isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone']);
|
||||
if ($isFriend) {
|
||||
$passedWeChatId = $wechatId;
|
||||
break;
|
||||
if (!empty($wechatId)) {
|
||||
$isFriend = $this->checkIfIsWeChatFriendByPhone($wechatId, $task['phone']);
|
||||
if ($isFriend) {
|
||||
$passedWeChatId = $wechatId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
if ($passedWeChatId && !empty($task_info['msgConf'])) {
|
||||
|
||||
Db::name('task_customer')
|
||||
@@ -334,6 +334,104 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public function handleCustomerTaskNewUser()
|
||||
{
|
||||
$task = Db::name('customer_acquisition_task')
|
||||
->where(['status' => 1, 'deleteTime' => 0])
|
||||
->whereIn('sceneId', [7])
|
||||
->order('id desc')
|
||||
->select();
|
||||
|
||||
if (empty($task)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach ($task as $item) {
|
||||
$sceneConf = json_decode($item['sceneConf'], true);
|
||||
//群获客
|
||||
if ($item['sceneId'] == 7) {
|
||||
if (!empty($sceneConf['groupSelected']) && is_array($sceneConf['groupSelected'])) {
|
||||
$rows = Db::name('wechat_group_member')->alias('gm')
|
||||
->join('wechat_account wa', 'gm.identifier = wa.wechatId')
|
||||
->where('gm.companyId', $item['companyId'])
|
||||
->whereIn('gm.groupId', $sceneConf['groupSelected'])
|
||||
->group('gm.identifier')
|
||||
->column('wa.id,wa.wechatId,wa.alias,wa.phone');
|
||||
|
||||
|
||||
// 1000条为一组进行批量处理
|
||||
$batchSize = 1000;
|
||||
$totalRows = count($rows);
|
||||
|
||||
for ($i = 0; $i < $totalRows; $i += $batchSize) {
|
||||
$batchRows = array_slice($rows, $i, $batchSize);
|
||||
|
||||
if (!empty($batchRows)) {
|
||||
// 1. 提取当前批次的phone
|
||||
$phones = [];
|
||||
foreach ($batchRows as $row) {
|
||||
if (!empty($row['phone'])) {
|
||||
$phone = !empty($row['phone']);
|
||||
} elseif (!empty($row['alias'])) {
|
||||
$phone = $row['alias'];
|
||||
} else {
|
||||
$phone = $row['wechatId'];
|
||||
}
|
||||
if (!empty($phone)) {
|
||||
$phones[] = $phone;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 批量查询已存在的phone
|
||||
$existingPhones = [];
|
||||
if (!empty($phones)) {
|
||||
$existing = Db::name('task_customer')
|
||||
->where('task_id', $item['id'])
|
||||
->where('phone', 'in', $phones)
|
||||
->field('phone')
|
||||
->select();
|
||||
$existingPhones = array_column($existing, 'phone');
|
||||
}
|
||||
|
||||
// 3. 过滤出新数据,批量插入
|
||||
$newData = [];
|
||||
foreach ($batchRows as $row) {
|
||||
if (!empty($row['phone'])) {
|
||||
$phone = !empty($row['phone']);
|
||||
} elseif (!empty($row['alias'])) {
|
||||
$phone = $row['alias'];
|
||||
} else {
|
||||
$phone = $row['wechatId'];
|
||||
}
|
||||
if (!empty($phone) && !in_array($phone, $existingPhones)) {
|
||||
$newData[] = [
|
||||
'task_id' => $item['id'],
|
||||
'name' => '',
|
||||
'source' => '场景获客_' . $item['name'],
|
||||
'phone' => $phone,
|
||||
'tags' => json_encode([], JSON_UNESCAPED_UNICODE),
|
||||
'siteTags' => json_encode([], JSON_UNESCAPED_UNICODE),
|
||||
'createTime' => time(),
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 批量插入新数据
|
||||
if (!empty($newData)) {
|
||||
Db::name('task_customer')->insertAll($newData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
exit_data($sceneConf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 发微信个人消息
|
||||
public function sendMsgToFriend(int $friendId, int $wechatAccountId, array $msgConf)
|
||||
{
|
||||
@@ -352,7 +450,7 @@ class Adapter implements WeChatServiceInterface
|
||||
$username = Env::get('api.username', '');
|
||||
$password = Env::get('api.password', '');
|
||||
if (!empty($username) || !empty($password)) {
|
||||
$toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId');
|
||||
$toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId');
|
||||
}
|
||||
|
||||
// 建立WebSocket
|
||||
@@ -381,7 +479,7 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
case 'file':
|
||||
$msgType = 49;
|
||||
|
||||
|
||||
$detail = [
|
||||
'type' => 'file',
|
||||
'title' => $content['content'][0]['name'],
|
||||
@@ -418,12 +516,12 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
|
||||
if(empty($detail)){
|
||||
if (empty($detail)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($gap) {
|
||||
Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content,$detail) {
|
||||
Timer::add($gap, function () use ($wsController, $friendId, $wechatAccountId, $msgType, $content, $detail) {
|
||||
$wsController->sendPersonal([
|
||||
'wechatFriendId' => $friendId,
|
||||
'wechatAccountId' => $wechatAccountId,
|
||||
@@ -463,7 +561,8 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
// 检查是否是好友关系
|
||||
public function checkIfIsWeChatFriendByPhone(string $wxId, string $phone,string $siteTags): bool
|
||||
|
||||
public function checkIfIsWeChatFriendByPhone($wxId = '', $phone = '', $siteTags = '')
|
||||
{
|
||||
if (empty($wxId) || empty($phone)) {
|
||||
return false;
|
||||
@@ -472,7 +571,7 @@ class Adapter implements WeChatServiceInterface
|
||||
try {
|
||||
$friend = Db::table('s2_wechat_friend')
|
||||
->where('ownerWechatId', $wxId)
|
||||
->where(['isPassed' => 1,'isDeleted' => 0])
|
||||
->where(['isPassed' => 1, 'isDeleted' => 0])
|
||||
->where('phone|alias|wechatId', 'like', $phone . '%')
|
||||
->order('createTime', 'desc')
|
||||
->find();
|
||||
@@ -480,17 +579,17 @@ class Adapter implements WeChatServiceInterface
|
||||
if (!empty($siteTags)) {
|
||||
$siteTags = json_decode($siteTags, true);
|
||||
$siteLabels = json_decode($friend['siteLabels'], true);
|
||||
$tags = array_merge($siteTags,$siteLabels);
|
||||
$tags = array_merge($siteTags, $siteLabels);
|
||||
$tags = array_unique($tags);
|
||||
$tags = array_values($tags);
|
||||
if (empty($tags)){
|
||||
if (empty($tags)) {
|
||||
$tags = [];
|
||||
}
|
||||
$tags = json_encode($tags,256);
|
||||
Db::table('s2_wechat_friend')->where(['id' => $friend['id']])->update(['siteLabels' => $tags,'updateTime' => time()]);
|
||||
$tags = json_encode($tags, 256);
|
||||
Db::table('s2_wechat_friend')->where(['id' => $friend['id']])->update(['siteLabels' => $tags, 'updateTime' => time()]);
|
||||
}
|
||||
return true;
|
||||
}else{
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
@@ -637,8 +736,7 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
//强制请求添加好友的列表
|
||||
$friendController = new FriendTaskController();
|
||||
$friendController->getlist(0,50);
|
||||
|
||||
$friendController->getlist(0, 50);
|
||||
|
||||
|
||||
$record = $this->getLatestFriendTask($wechatId);
|
||||
@@ -751,9 +849,9 @@ class Adapter implements WeChatServiceInterface
|
||||
$friendController = new FriendTaskController();
|
||||
$result = $friendController->addFriendTask($params);
|
||||
$result = json_decode($result, true);
|
||||
if ($result['code'] == 200){
|
||||
if ($result['code'] == 200) {
|
||||
return $result;
|
||||
}else{
|
||||
} else {
|
||||
$authorization = AuthService::getSystemAuthorization(false);
|
||||
return $this->addFriendTaskApi($wechatAccountId, $phone, $message, $remark, $labels, $authorization);
|
||||
}
|
||||
@@ -768,7 +866,7 @@ class Adapter implements WeChatServiceInterface
|
||||
return;
|
||||
}
|
||||
|
||||
switch ($conf['remarkType']){
|
||||
switch ($conf['remarkType']) {
|
||||
case 'phone':
|
||||
$remark = $phone . '-' . $conf['task_name'];
|
||||
break;
|
||||
@@ -776,7 +874,7 @@ class Adapter implements WeChatServiceInterface
|
||||
$remark = '';
|
||||
break;
|
||||
case 'source':
|
||||
$remark = $conf['task_name'];
|
||||
$remark = $conf['task_name'];
|
||||
break;
|
||||
default:
|
||||
$remark = '';
|
||||
@@ -955,7 +1053,7 @@ class Adapter implements WeChatServiceInterface
|
||||
* 大数据量分批处理版本
|
||||
* 适用于数据源非常大的情况,避免一次性加载全部数据到内存
|
||||
* 独立脚本执行,30min 同步一次 和 流量来源的更新一起
|
||||
*
|
||||
*
|
||||
* @param int $batchSize 每批处理的数据量
|
||||
* @return int 影响的行数
|
||||
*/
|
||||
@@ -1009,7 +1107,7 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
/**
|
||||
* 同步/更新微信客服信息到ck_wechat_customer表
|
||||
*
|
||||
*
|
||||
* @param int $batchSize 每批处理的数据量
|
||||
* @return int 影响的行数
|
||||
*/
|
||||
@@ -1151,7 +1249,7 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
/**
|
||||
* 计算客服权重
|
||||
*
|
||||
*
|
||||
* @param array $basic 基础信息
|
||||
* @param array $activity 活跃信息
|
||||
* @param array $friendShip 好友关系信息
|
||||
@@ -1213,7 +1311,7 @@ class Adapter implements WeChatServiceInterface
|
||||
/**
|
||||
* 同步设备信息到ck_device表
|
||||
* 数据量不大,仅同步一次所有设备
|
||||
*
|
||||
*
|
||||
* @return int 影响的行数
|
||||
*/
|
||||
public function syncDevice()
|
||||
@@ -1390,5 +1488,4 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user