update script
This commit is contained in:
@@ -8,6 +8,8 @@ use WeChatDeviceApi\Exceptions\ApiException;
|
||||
// use WeChatDeviceApi\Adapters\ChuKeBao\Client as ChuKeBaoApiClient;
|
||||
|
||||
use think\Db;
|
||||
use think\facade\Config;
|
||||
use think\facade\Log;
|
||||
|
||||
class Adapter implements WeChatServiceInterface
|
||||
{
|
||||
@@ -16,26 +18,29 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
public function __construct(array $config = [])
|
||||
{
|
||||
$this->config = $config;
|
||||
|
||||
// $this->config = $config ?: Config::get('wechat_device_api.');
|
||||
$this->config = $config ?: Config::get('wechat_device_api.adapters.ChuKeBao');
|
||||
// $this->config = $config;
|
||||
// $this->apiClient = new ChuKeBaoApiClient($config['api_key'], $config['api_secret'], $config['base_url']);
|
||||
// 校验配置等...
|
||||
if (empty($config['api_key']) || empty($config['username']) || empty($config['password'])) {
|
||||
if (empty($this->config['base_url']) || empty($this->config['username']) || empty($this->config['password'])) {
|
||||
throw new \InvalidArgumentException("ChuKeBao username and password are required.");
|
||||
}
|
||||
}
|
||||
|
||||
public function addFriend(string $deviceId, string $targetWxId): bool
|
||||
{
|
||||
// 1. 构建请求参数 (VendorA 特定的格式)
|
||||
// 1. 构建请求参数 (ChuKeBao 特定的格式)
|
||||
$params = [
|
||||
'device_identifier' => $deviceId,
|
||||
'wechat_user_to_add' => $targetWxId,
|
||||
'username' => $this->config['username'],
|
||||
'password' => $this->config['password'],
|
||||
// ... 其他 VendorA 特定参数
|
||||
// ... 其他 ChuKeBao 特定参数
|
||||
];
|
||||
|
||||
// 2. 调用 VendorA 的 API (例如使用 GuzzleHttp 或 cURL)
|
||||
// 2. 调用 ChuKeBao 的 API (例如使用 GuzzleHttp 或 cURL)
|
||||
// $response = $this->apiClient->post('/friend/add', $params);
|
||||
// 伪代码:
|
||||
$url = $this->config['base_url'] . '/friend/add';
|
||||
@@ -44,16 +49,16 @@ class Adapter implements WeChatServiceInterface
|
||||
// $responseData = json_decode($response->getBody()->getContents(), true);
|
||||
|
||||
// 模拟API调用
|
||||
echo "VendorA: Adding friend {$targetWxId} using device {$deviceId}\n";
|
||||
echo "ChuKeBao: Adding friend {$targetWxId} using device {$deviceId}\n";
|
||||
$responseData = ['code' => 0, 'message' => 'Success']; // 假设的响应
|
||||
|
||||
// 3. 处理响应,转换为标准结果
|
||||
if (!isset($responseData['code'])) {
|
||||
throw new ApiException("VendorA: Invalid API response for addFriend.");
|
||||
throw new ApiException("ChuKeBao: Invalid API response for addFriend.");
|
||||
}
|
||||
|
||||
if ($responseData['code'] !== 0) {
|
||||
throw new ApiException("VendorA: Failed to add friend - " . ($responseData['message'] ?? 'Unknown error'));
|
||||
throw new ApiException("ChuKeBao: Failed to add friend - " . ($responseData['message'] ?? 'Unknown error'));
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -61,17 +66,17 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
public function likeMoment(string $deviceId, string $momentId): bool
|
||||
{
|
||||
echo "VendorA: Liking moment {$momentId} using device {$deviceId}\n";
|
||||
echo "ChuKeBao: Liking moment {$momentId} using device {$deviceId}\n";
|
||||
// 实现 VendorA 的点赞逻辑
|
||||
return true;
|
||||
}
|
||||
|
||||
public function getGroupList(string $deviceId): array
|
||||
{
|
||||
echo "VendorA: Getting group list for device {$deviceId}\n";
|
||||
echo "ChuKeBao: Getting group list for device {$deviceId}\n";
|
||||
// 实现 VendorA 的获取群列表逻辑,并转换数据格式
|
||||
return [
|
||||
['id' => 'group1_va', 'name' => 'VendorA Group 1', 'member_count' => 10],
|
||||
['id' => 'group1_va', 'name' => 'ChuKeBao Group 1', 'member_count' => 10],
|
||||
];
|
||||
}
|
||||
|
||||
@@ -79,19 +84,19 @@ class Adapter implements WeChatServiceInterface
|
||||
{
|
||||
echo "VendorA: Getting friend list for device {$deviceId}\n";
|
||||
return [
|
||||
['id' => 'friend1_va', 'nickname' => 'VendorA Friend 1', 'remark' => 'VA-F1'],
|
||||
['id' => 'friend1_va', 'nickname' => 'ChuKeBao Friend 1', 'remark' => 'VA-F1'],
|
||||
];
|
||||
}
|
||||
|
||||
public function getDeviceInfo(string $deviceId): array
|
||||
{
|
||||
echo "VendorA: Getting device info for device {$deviceId}\n";
|
||||
echo "ChuKeBao: Getting device info for device {$deviceId}\n";
|
||||
return ['id' => $deviceId, 'status' => 'online_va', 'battery' => '80%'];
|
||||
}
|
||||
|
||||
public function bindDeviceToCompany(string $deviceId, string $companyId): bool
|
||||
{
|
||||
echo "VendorA: Binding device {$deviceId} to company {$companyId}\n";
|
||||
echo "ChuKeBao: Binding device {$deviceId} to company {$companyId}\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -103,7 +108,7 @@ class Adapter implements WeChatServiceInterface
|
||||
*/
|
||||
public function getChatroomMemberList(string $deviceId, string $chatroomId): array
|
||||
{
|
||||
echo "VendorA: Getting chatroom member list for device {$deviceId}, chatroom {$chatroomId}\n";
|
||||
echo "ChuKeBao: Getting chatroom member list for device {$deviceId}, chatroom {$chatroomId}\n";
|
||||
return [
|
||||
['id' => 'member1_va', 'nickname' => 'VendorA Member 1', 'avatar' => ''],
|
||||
];
|
||||
@@ -160,7 +165,6 @@ class Adapter implements WeChatServiceInterface
|
||||
|
||||
$offset = 0;
|
||||
$limit = 2000;
|
||||
// $usleepTime = 100000;
|
||||
$usleepTime = 50000;
|
||||
|
||||
do {
|
||||
@@ -179,7 +183,7 @@ class Adapter implements WeChatServiceInterface
|
||||
SELECT
|
||||
wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime
|
||||
FROM
|
||||
s2_wechat_friend GROUP BY wechatId;
|
||||
s2_wechat_friend GROUP BY wechatId
|
||||
ON DUPLICATE KEY UPDATE
|
||||
alias=VALUES(alias),
|
||||
nickname=VALUES(nickname),
|
||||
@@ -200,26 +204,125 @@ class Adapter implements WeChatServiceInterface
|
||||
}
|
||||
|
||||
// syncWechatDeviceLoginLog
|
||||
// public function syncWechatDeviceLoginLog()
|
||||
// {
|
||||
// try {
|
||||
// // 确保使用正确的表名,不要让框架自动添加前缀
|
||||
// Db::connect()->table('s2_wechat_account')
|
||||
// ->alias('a')
|
||||
// ->join(['s2_device' => 'd'], 'd.imei = a.imei')
|
||||
// ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId')
|
||||
// ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
|
||||
// ->chunk(1000, function ($data) {
|
||||
// try {
|
||||
// foreach ($data as $item) {
|
||||
// Log::info("syncWechatDeviceLoginLog: " . json_encode($item));
|
||||
// try {
|
||||
// // 检查所有必要字段是否存在,如果不存在则设置默认值
|
||||
// // if (!isset($item['deviceId']) || !isset($item['wechatId']) ||
|
||||
// // !isset($item['alive']) || !isset($item['companyId']) ||
|
||||
// // !isset($item['createTime'])) {
|
||||
|
||||
// // \think\facade\Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item));
|
||||
|
||||
// // // 为缺失字段设置默认值
|
||||
// // $item['deviceId'] = $item['deviceId'] ?? '';
|
||||
// // $item['wechatId'] = $item['wechatId'] ?? '';
|
||||
// // $item['alive'] = $item['alive'] ?? 0;
|
||||
// // $item['companyId'] = $item['companyId'] ?? 0;
|
||||
// // $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s');
|
||||
|
||||
// // // 如果关键字段仍然为空,则跳过此条记录
|
||||
// // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) {
|
||||
// // continue;
|
||||
// // }
|
||||
// // }
|
||||
// if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// $exists = Db::connect()->table('ck_device_wechat_login')
|
||||
// ->where('deviceId', $item['deviceId'])
|
||||
// ->where('wechatId', $item['wechatId'])
|
||||
// ->where('createTime', $item['createTime'])
|
||||
// ->find();
|
||||
|
||||
// if (!$exists) {
|
||||
// Db::connect()->table('ck_device_wechat_login')->insert($item);
|
||||
// }
|
||||
// } catch (\Exception $e) {
|
||||
// \think\facade\Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString());
|
||||
// continue; // 跳过这条出错的记录,继续处理下一条
|
||||
// }
|
||||
// }
|
||||
// } catch (\Exception $e) {
|
||||
// \think\facade\Log::error("处理批次数据时出错: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
|
||||
// // 不抛出异常,让程序继续处理下一批次数据
|
||||
// }
|
||||
// });
|
||||
// } catch (\Exception $e) {
|
||||
// \think\facade\Log::error("微信好友同步任务异常1: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
|
||||
// // 可以选择重新抛出异常或者返回false
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// return true;
|
||||
// }
|
||||
public function syncWechatDeviceLoginLog()
|
||||
{
|
||||
Db::table('s2_wechat_account')
|
||||
->alias('a')
|
||||
->join('s2_device d', 'd.imei = a.imei')
|
||||
->join('s2_company_account c', 'c.id = d.currentAccountId')
|
||||
->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
|
||||
->chunk(1000, function ($data) {
|
||||
foreach ($data as $item) {
|
||||
$exists = Db::table('ck_device_wechat_login')
|
||||
try {
|
||||
// 确保使用正确的表名,不要让框架自动添加前缀
|
||||
$cursor = Db::connect()->table('s2_wechat_account')
|
||||
->alias('a')
|
||||
->join(['s2_device' => 'd'], 'd.imei = a.imei')
|
||||
->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId')
|
||||
->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
|
||||
->cursor();
|
||||
|
||||
foreach ($cursor as $item) {
|
||||
try {
|
||||
// 检查所有必要字段是否存在,如果不存在则设置默认值
|
||||
if (
|
||||
!isset($item['deviceId']) || !isset($item['wechatId']) ||
|
||||
!isset($item['alive']) || !isset($item['companyId']) ||
|
||||
!isset($item['createTime'])
|
||||
) {
|
||||
|
||||
Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item));
|
||||
|
||||
// 为缺失字段设置默认值
|
||||
$item['deviceId'] = $item['deviceId'] ?? '';
|
||||
$item['wechatId'] = $item['wechatId'] ?? '';
|
||||
$item['alive'] = $item['alive'] ?? 0;
|
||||
$item['companyId'] = $item['companyId'] ?? 0;
|
||||
$item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s');
|
||||
|
||||
// 如果关键字段仍然为空,则跳过此条记录
|
||||
if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
$exists = Db::connect()->table('ck_device_wechat_login')
|
||||
->where('deviceId', $item['deviceId'])
|
||||
->where('wechatId', $item['wechatId'])
|
||||
->where('createTime', $item['createTime'])
|
||||
->find();
|
||||
|
||||
if (!$exists) {
|
||||
Db::table('ck_device_wechat_login')->insert($item);
|
||||
Db::connect()->table('ck_device_wechat_login')->insert($item);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString());
|
||||
continue; // 跳过这条出错的记录,继续处理下一条
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (\Exception $e) {
|
||||
Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -230,48 +333,8 @@ class Adapter implements WeChatServiceInterface
|
||||
* @param int $batchSize 每批处理的数据量
|
||||
* @return int 影响的行数
|
||||
*/
|
||||
// public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000)
|
||||
// {
|
||||
// // 1. 获取数据总量
|
||||
// $total = Db::table('s2_wechat_friend')
|
||||
// ->group('wechatId')
|
||||
// ->count();
|
||||
|
||||
// // 2. 计算需要处理的批次
|
||||
// $batchCount = ceil($total / $batchSize);
|
||||
// $affectedRows = 0;
|
||||
|
||||
// // 3. 分批处理
|
||||
// for ($i = 0; $i < $batchCount; $i++) {
|
||||
// $offset = $i * $batchSize;
|
||||
|
||||
// // 分批查询SQL,使用LIMIT控制每次获取的数据量
|
||||
// $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
|
||||
// SELECT wechatId identifier, wechatId, phone
|
||||
// FROM (
|
||||
// SELECT wechatId, phone
|
||||
// FROM `s2_wechat_friend`
|
||||
// GROUP BY wechatId
|
||||
// LIMIT {$offset}, {$batchSize}
|
||||
// ) AS temp";
|
||||
|
||||
// $affectedRows += Db::execute($sql);
|
||||
|
||||
// // 释放内存
|
||||
// if ($i % 5 == 0) {
|
||||
// Db::clear();
|
||||
// gc_collect_cycles();
|
||||
// }
|
||||
// }
|
||||
|
||||
// return $affectedRows;
|
||||
// }
|
||||
|
||||
|
||||
|
||||
public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000)
|
||||
{
|
||||
// 1. 先获取去重后的wechatId清单并建立临时索引
|
||||
Db::execute("CREATE TEMPORARY TABLE IF NOT EXISTS temp_wechat_ids (
|
||||
wechatId VARCHAR(64) PRIMARY KEY
|
||||
) ENGINE=MEMORY");
|
||||
@@ -281,26 +344,15 @@ class Adapter implements WeChatServiceInterface
|
||||
// 批量插入去重的wechatId
|
||||
Db::execute("INSERT INTO temp_wechat_ids SELECT DISTINCT wechatId FROM s2_wechat_friend");
|
||||
|
||||
// 2. 获取临时表的数据总量
|
||||
$total = Db::table('temp_wechat_ids')->count();
|
||||
|
||||
// 3. 计算需要处理的批次
|
||||
$batchCount = ceil($total / $batchSize);
|
||||
$affectedRows = 0;
|
||||
|
||||
// 4. 开始事务处理批量数据
|
||||
try {
|
||||
for ($i = 0; $i < $batchCount; $i++) {
|
||||
$offset = $i * $batchSize;
|
||||
|
||||
// 使用临时表和JOIN来提高查询效率
|
||||
// $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
|
||||
// SELECT t.wechatId AS identifier, t.wechatId, f.phone AS mobile
|
||||
// FROM (
|
||||
// SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize}
|
||||
// ) AS t
|
||||
// LEFT JOIN s2_wechat_friend f ON t.wechatId = f.wechatId
|
||||
// GROUP BY t.wechatId";
|
||||
$sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
|
||||
SELECT t.wechatId AS identifier, t.wechatId,
|
||||
(SELECT phone FROM s2_wechat_friend
|
||||
@@ -312,38 +364,19 @@ class Adapter implements WeChatServiceInterface
|
||||
$currentAffected = Db::execute($sql);
|
||||
$affectedRows += $currentAffected;
|
||||
|
||||
// 更频繁地释放内存
|
||||
// if ($i % 2 == 1) {
|
||||
if ($i % 5 == 0) {
|
||||
// Db::clear();
|
||||
gc_collect_cycles();
|
||||
}
|
||||
|
||||
// 添加短暂休眠,但不要太长以免影响总体执行时间
|
||||
// 20-50毫秒通常是个不错的平衡点
|
||||
usleep(30000); // 30毫秒
|
||||
|
||||
// 如果批处理大小很大或者当前批影响行数很多,可以适当增加休眠时间
|
||||
// if ($currentAffected > 1000) {
|
||||
// usleep(20000); // 额外增加20毫秒
|
||||
// }
|
||||
|
||||
// 输出进度日志
|
||||
// $progress = round(($i + 1) / $batchCount * 100, 2);
|
||||
// \think\facade\Log::info("Traffic pool sync progress: {$progress}% completed. Rows affected in this batch: {$currentAffected}");
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
\think\facade\Log::error("Error in traffic pool sync: " . $e->getMessage());
|
||||
// 删除临时表
|
||||
// Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
|
||||
throw $e;
|
||||
} finally {
|
||||
Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
|
||||
}
|
||||
|
||||
// 5. 清理临时表
|
||||
// Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
|
||||
|
||||
return $affectedRows;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user