From 12ed5d7983c9eefa6264b2c234b90436eea3976b Mon Sep 17 00:00:00 2001 From: xavier Date: Tue, 13 May 2025 18:12:12 +0800 Subject: [PATCH] update script --- .../Adapters/ChuKeBao/Adapter.php | 227 ++++++++++-------- 1 file changed, 130 insertions(+), 97 deletions(-) diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index 00d82c76..bc4a2a9e 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -8,6 +8,8 @@ use WeChatDeviceApi\Exceptions\ApiException; // use WeChatDeviceApi\Adapters\ChuKeBao\Client as ChuKeBaoApiClient; use think\Db; +use think\facade\Config; +use think\facade\Log; class Adapter implements WeChatServiceInterface { @@ -16,26 +18,29 @@ class Adapter implements WeChatServiceInterface public function __construct(array $config = []) { - $this->config = $config; + + // $this->config = $config ?: Config::get('wechat_device_api.'); + $this->config = $config ?: Config::get('wechat_device_api.adapters.ChuKeBao'); + // $this->config = $config; // $this->apiClient = new ChuKeBaoApiClient($config['api_key'], $config['api_secret'], $config['base_url']); // 校验配置等... - if (empty($config['api_key']) || empty($config['username']) || empty($config['password'])) { + if (empty($this->config['base_url']) || empty($this->config['username']) || empty($this->config['password'])) { throw new \InvalidArgumentException("ChuKeBao username and password are required."); } } public function addFriend(string $deviceId, string $targetWxId): bool { - // 1. 构建请求参数 (VendorA 特定的格式) + // 1. 构建请求参数 (ChuKeBao 特定的格式) $params = [ 'device_identifier' => $deviceId, 'wechat_user_to_add' => $targetWxId, 'username' => $this->config['username'], 'password' => $this->config['password'], - // ... 其他 VendorA 特定参数 + // ... 其他 ChuKeBao 特定参数 ]; - // 2. 调用 VendorA 的 API (例如使用 GuzzleHttp 或 cURL) + // 2. 调用 ChuKeBao 的 API (例如使用 GuzzleHttp 或 cURL) // $response = $this->apiClient->post('/friend/add', $params); // 伪代码: $url = $this->config['base_url'] . '/friend/add'; @@ -44,16 +49,16 @@ class Adapter implements WeChatServiceInterface // $responseData = json_decode($response->getBody()->getContents(), true); // 模拟API调用 - echo "VendorA: Adding friend {$targetWxId} using device {$deviceId}\n"; + echo "ChuKeBao: Adding friend {$targetWxId} using device {$deviceId}\n"; $responseData = ['code' => 0, 'message' => 'Success']; // 假设的响应 // 3. 处理响应,转换为标准结果 if (!isset($responseData['code'])) { - throw new ApiException("VendorA: Invalid API response for addFriend."); + throw new ApiException("ChuKeBao: Invalid API response for addFriend."); } if ($responseData['code'] !== 0) { - throw new ApiException("VendorA: Failed to add friend - " . ($responseData['message'] ?? 'Unknown error')); + throw new ApiException("ChuKeBao: Failed to add friend - " . ($responseData['message'] ?? 'Unknown error')); } return true; @@ -61,17 +66,17 @@ class Adapter implements WeChatServiceInterface public function likeMoment(string $deviceId, string $momentId): bool { - echo "VendorA: Liking moment {$momentId} using device {$deviceId}\n"; + echo "ChuKeBao: Liking moment {$momentId} using device {$deviceId}\n"; // 实现 VendorA 的点赞逻辑 return true; } public function getGroupList(string $deviceId): array { - echo "VendorA: Getting group list for device {$deviceId}\n"; + echo "ChuKeBao: Getting group list for device {$deviceId}\n"; // 实现 VendorA 的获取群列表逻辑,并转换数据格式 return [ - ['id' => 'group1_va', 'name' => 'VendorA Group 1', 'member_count' => 10], + ['id' => 'group1_va', 'name' => 'ChuKeBao Group 1', 'member_count' => 10], ]; } @@ -79,19 +84,19 @@ class Adapter implements WeChatServiceInterface { echo "VendorA: Getting friend list for device {$deviceId}\n"; return [ - ['id' => 'friend1_va', 'nickname' => 'VendorA Friend 1', 'remark' => 'VA-F1'], + ['id' => 'friend1_va', 'nickname' => 'ChuKeBao Friend 1', 'remark' => 'VA-F1'], ]; } public function getDeviceInfo(string $deviceId): array { - echo "VendorA: Getting device info for device {$deviceId}\n"; + echo "ChuKeBao: Getting device info for device {$deviceId}\n"; return ['id' => $deviceId, 'status' => 'online_va', 'battery' => '80%']; } public function bindDeviceToCompany(string $deviceId, string $companyId): bool { - echo "VendorA: Binding device {$deviceId} to company {$companyId}\n"; + echo "ChuKeBao: Binding device {$deviceId} to company {$companyId}\n"; return true; } @@ -103,7 +108,7 @@ class Adapter implements WeChatServiceInterface */ public function getChatroomMemberList(string $deviceId, string $chatroomId): array { - echo "VendorA: Getting chatroom member list for device {$deviceId}, chatroom {$chatroomId}\n"; + echo "ChuKeBao: Getting chatroom member list for device {$deviceId}, chatroom {$chatroomId}\n"; return [ ['id' => 'member1_va', 'nickname' => 'VendorA Member 1', 'avatar' => ''], ]; @@ -160,7 +165,6 @@ class Adapter implements WeChatServiceInterface $offset = 0; $limit = 2000; - // $usleepTime = 100000; $usleepTime = 50000; do { @@ -179,7 +183,7 @@ class Adapter implements WeChatServiceInterface SELECT wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime FROM - s2_wechat_friend GROUP BY wechatId; + s2_wechat_friend GROUP BY wechatId ON DUPLICATE KEY UPDATE alias=VALUES(alias), nickname=VALUES(nickname), @@ -200,26 +204,125 @@ class Adapter implements WeChatServiceInterface } // syncWechatDeviceLoginLog + // public function syncWechatDeviceLoginLog() + // { + // try { + // // 确保使用正确的表名,不要让框架自动添加前缀 + // Db::connect()->table('s2_wechat_account') + // ->alias('a') + // ->join(['s2_device' => 'd'], 'd.imei = a.imei') + // ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') + // ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') + // ->chunk(1000, function ($data) { + // try { + // foreach ($data as $item) { + // Log::info("syncWechatDeviceLoginLog: " . json_encode($item)); + // try { + // // 检查所有必要字段是否存在,如果不存在则设置默认值 + // // if (!isset($item['deviceId']) || !isset($item['wechatId']) || + // // !isset($item['alive']) || !isset($item['companyId']) || + // // !isset($item['createTime'])) { + + // // \think\facade\Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item)); + + // // // 为缺失字段设置默认值 + // // $item['deviceId'] = $item['deviceId'] ?? ''; + // // $item['wechatId'] = $item['wechatId'] ?? ''; + // // $item['alive'] = $item['alive'] ?? 0; + // // $item['companyId'] = $item['companyId'] ?? 0; + // // $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s'); + + // // // 如果关键字段仍然为空,则跳过此条记录 + // // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { + // // continue; + // // } + // // } + // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { + // continue; + // } + + // $exists = Db::connect()->table('ck_device_wechat_login') + // ->where('deviceId', $item['deviceId']) + // ->where('wechatId', $item['wechatId']) + // ->where('createTime', $item['createTime']) + // ->find(); + + // if (!$exists) { + // Db::connect()->table('ck_device_wechat_login')->insert($item); + // } + // } catch (\Exception $e) { + // \think\facade\Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString()); + // continue; // 跳过这条出错的记录,继续处理下一条 + // } + // } + // } catch (\Exception $e) { + // \think\facade\Log::error("处理批次数据时出错: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); + // // 不抛出异常,让程序继续处理下一批次数据 + // } + // }); + // } catch (\Exception $e) { + // \think\facade\Log::error("微信好友同步任务异常1: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); + // // 可以选择重新抛出异常或者返回false + // return false; + // } + + // return true; + // } public function syncWechatDeviceLoginLog() { - Db::table('s2_wechat_account') - ->alias('a') - ->join('s2_device d', 'd.imei = a.imei') - ->join('s2_company_account c', 'c.id = d.currentAccountId') - ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') - ->chunk(1000, function ($data) { - foreach ($data as $item) { - $exists = Db::table('ck_device_wechat_login') + try { + // 确保使用正确的表名,不要让框架自动添加前缀 + $cursor = Db::connect()->table('s2_wechat_account') + ->alias('a') + ->join(['s2_device' => 'd'], 'd.imei = a.imei') + ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') + ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') + ->cursor(); + + foreach ($cursor as $item) { + try { + // 检查所有必要字段是否存在,如果不存在则设置默认值 + if ( + !isset($item['deviceId']) || !isset($item['wechatId']) || + !isset($item['alive']) || !isset($item['companyId']) || + !isset($item['createTime']) + ) { + + Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item)); + + // 为缺失字段设置默认值 + $item['deviceId'] = $item['deviceId'] ?? ''; + $item['wechatId'] = $item['wechatId'] ?? ''; + $item['alive'] = $item['alive'] ?? 0; + $item['companyId'] = $item['companyId'] ?? 0; + $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s'); + + // 如果关键字段仍然为空,则跳过此条记录 + if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { + continue; + } + } + + $exists = Db::connect()->table('ck_device_wechat_login') ->where('deviceId', $item['deviceId']) ->where('wechatId', $item['wechatId']) ->where('createTime', $item['createTime']) ->find(); if (!$exists) { - Db::table('ck_device_wechat_login')->insert($item); + Db::connect()->table('ck_device_wechat_login')->insert($item); } + } catch (\Exception $e) { + Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString()); + continue; // 跳过这条出错的记录,继续处理下一条 } - }); + } + + return true; + } catch (\Exception $e) { + Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); + return false; + } } /** @@ -230,48 +333,8 @@ class Adapter implements WeChatServiceInterface * @param int $batchSize 每批处理的数据量 * @return int 影响的行数 */ - // public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000) - // { - // // 1. 获取数据总量 - // $total = Db::table('s2_wechat_friend') - // ->group('wechatId') - // ->count(); - - // // 2. 计算需要处理的批次 - // $batchCount = ceil($total / $batchSize); - // $affectedRows = 0; - - // // 3. 分批处理 - // for ($i = 0; $i < $batchCount; $i++) { - // $offset = $i * $batchSize; - - // // 分批查询SQL,使用LIMIT控制每次获取的数据量 - // $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) - // SELECT wechatId identifier, wechatId, phone - // FROM ( - // SELECT wechatId, phone - // FROM `s2_wechat_friend` - // GROUP BY wechatId - // LIMIT {$offset}, {$batchSize} - // ) AS temp"; - - // $affectedRows += Db::execute($sql); - - // // 释放内存 - // if ($i % 5 == 0) { - // Db::clear(); - // gc_collect_cycles(); - // } - // } - - // return $affectedRows; - // } - - - public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000) { - // 1. 先获取去重后的wechatId清单并建立临时索引 Db::execute("CREATE TEMPORARY TABLE IF NOT EXISTS temp_wechat_ids ( wechatId VARCHAR(64) PRIMARY KEY ) ENGINE=MEMORY"); @@ -281,26 +344,15 @@ class Adapter implements WeChatServiceInterface // 批量插入去重的wechatId Db::execute("INSERT INTO temp_wechat_ids SELECT DISTINCT wechatId FROM s2_wechat_friend"); - // 2. 获取临时表的数据总量 $total = Db::table('temp_wechat_ids')->count(); - // 3. 计算需要处理的批次 $batchCount = ceil($total / $batchSize); $affectedRows = 0; - // 4. 开始事务处理批量数据 try { for ($i = 0; $i < $batchCount; $i++) { $offset = $i * $batchSize; - // 使用临时表和JOIN来提高查询效率 - // $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) - // SELECT t.wechatId AS identifier, t.wechatId, f.phone AS mobile - // FROM ( - // SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize} - // ) AS t - // LEFT JOIN s2_wechat_friend f ON t.wechatId = f.wechatId - // GROUP BY t.wechatId"; $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) SELECT t.wechatId AS identifier, t.wechatId, (SELECT phone FROM s2_wechat_friend @@ -312,38 +364,19 @@ class Adapter implements WeChatServiceInterface $currentAffected = Db::execute($sql); $affectedRows += $currentAffected; - // 更频繁地释放内存 - // if ($i % 2 == 1) { if ($i % 5 == 0) { - // Db::clear(); gc_collect_cycles(); } - // 添加短暂休眠,但不要太长以免影响总体执行时间 - // 20-50毫秒通常是个不错的平衡点 usleep(30000); // 30毫秒 - - // 如果批处理大小很大或者当前批影响行数很多,可以适当增加休眠时间 - // if ($currentAffected > 1000) { - // usleep(20000); // 额外增加20毫秒 - // } - - // 输出进度日志 - // $progress = round(($i + 1) / $batchCount * 100, 2); - // \think\facade\Log::info("Traffic pool sync progress: {$progress}% completed. Rows affected in this batch: {$currentAffected}"); } } catch (\Exception $e) { \think\facade\Log::error("Error in traffic pool sync: " . $e->getMessage()); - // 删除临时表 - // Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); throw $e; } finally { Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); } - // 5. 清理临时表 - // Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); - return $affectedRows; } }