diff --git a/Server/application/command/SyncWechatDataToCkbTask.php b/Server/application/command/SyncWechatDataToCkbTask.php index 4db6b661..dea790fd 100644 --- a/Server/application/command/SyncWechatDataToCkbTask.php +++ b/Server/application/command/SyncWechatDataToCkbTask.php @@ -43,10 +43,14 @@ class SyncWechatDataToCkbTask extends Command file_put_contents($this->lockFile, time()); try { + + $output->writeln("同步任务 sync_wechat_to_ckb 开始"); $ChuKeBaoAdapter = new ChuKeBaoAdapter(); $this->syncWechatAccount($ChuKeBaoAdapter); $this->syncWechatFriend($ChuKeBaoAdapter); $this->syncWechatDeviceLoginLog($ChuKeBaoAdapter); + + $output->writeln("同步任务 sync_wechat_to_ckb 已结束"); return true; } catch (\Exception $e) { Log::error('微信好友同步任务异常:' . $e->getMessage()); diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index bc4a2a9e..e1ab20af 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -141,8 +141,10 @@ class Adapter implements WeChatServiceInterface return true; } + /* todo 以上方法待实现,基于/参考 application/api/controller/WebSocketController.php 去实现 */ + // NOTE: run in background; 5min 同步一次 // todo: 后续经过`s2_`表,直接对接三方的api去sync public function syncFriendship() @@ -179,11 +181,41 @@ class Adapter implements WeChatServiceInterface public function syncWechatAccount() { - $sql = "INSERT INTO ck_wechat_account(wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime) + $pk = 'wechatId'; + $limit = 1000; + // $lastId = ''; + $lastId = null; // Or some other sentinel indicating "first run" + + + $totalAffected = 0; + $iterations = 0; + $maxIterations = 10000; + + do { + // Fetch a batch of distinct wechatIds + // Important: Order by wechatId for consistent pagination + $sourceDb = Db::connect()->table('s2_wechat_friend'); + // if ($lastId !== '') { // For subsequent iterations + if (!is_null($lastId)) { // Check if it's not the first iteration + $sourceDb->where($pk, '>', $lastId); + } + $distinctWechatIds = $sourceDb->order($pk, 'ASC') + ->distinct(true) + ->limit($limit) + ->column($pk); // Get an array of wechatIds + + if (empty($distinctWechatIds)) { + break; // No more wechatIds to process + } + + // Prepare the main IODKU query for this batch of wechatIds + $sql = "INSERT INTO ck_wechat_account(wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime) SELECT wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime FROM - s2_wechat_friend GROUP BY wechatId + s2_wechat_friend + WHERE wechatId IN (" . implode(',', array_fill(0, count($distinctWechatIds), '?')) . ") + GROUP BY wechatId -- Grouping within the selected wechatIds ON DUPLICATE KEY UPDATE alias=VALUES(alias), nickname=VALUES(nickname), @@ -197,127 +229,84 @@ class Adapter implements WeChatServiceInterface country=VALUES(country), privince=VALUES(privince), city=VALUES(city), - updateTime=VALUES(updateTime);"; + updateTime=VALUES(updateTime)"; - $affected = Db::execute($sql); - return $affected; + // The parameters for the IN clause are the distinctWechatIds themselves + $bindings = $distinctWechatIds; + + try { + $affected = Db::execute($sql, $bindings); + $totalAffected += $affected; + // Log::info("syncWechatAccount: Processed batch of " . count($distinctWechatIds) . " distinct wechatIds. Affected rows: " . $affected); + + // Update lastId for the next iteration + $lastId = end($distinctWechatIds); + + if ($affected > 0) { + usleep(50000); + } + } catch (\Exception $e) { + Log::error("syncWechatAccount batch error: " . $e->getMessage() . " with wechatIds starting around " . $distinctWechatIds[0] . ". SQL: " . $sql . " Bindings: " . json_encode($bindings)); + // Decide if you want to break or continue with the next batch + break; // Example: break on error + } + $iterations++; + } while (count($distinctWechatIds) === $limit && $iterations < $maxIterations); // Continue if we fetched a full batch + + // Log::info("syncWechatAccount finished. Total affected rows: " . $totalAffected); + return $totalAffected; } - // syncWechatDeviceLoginLog - // public function syncWechatDeviceLoginLog() - // { - // try { - // // 确保使用正确的表名,不要让框架自动添加前缀 - // Db::connect()->table('s2_wechat_account') - // ->alias('a') - // ->join(['s2_device' => 'd'], 'd.imei = a.imei') - // ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') - // ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') - // ->chunk(1000, function ($data) { - // try { - // foreach ($data as $item) { - // Log::info("syncWechatDeviceLoginLog: " . json_encode($item)); - // try { - // // 检查所有必要字段是否存在,如果不存在则设置默认值 - // // if (!isset($item['deviceId']) || !isset($item['wechatId']) || - // // !isset($item['alive']) || !isset($item['companyId']) || - // // !isset($item['createTime'])) { - // // \think\facade\Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item)); - - // // // 为缺失字段设置默认值 - // // $item['deviceId'] = $item['deviceId'] ?? ''; - // // $item['wechatId'] = $item['wechatId'] ?? ''; - // // $item['alive'] = $item['alive'] ?? 0; - // // $item['companyId'] = $item['companyId'] ?? 0; - // // $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s'); - - // // // 如果关键字段仍然为空,则跳过此条记录 - // // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { - // // continue; - // // } - // // } - // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { - // continue; - // } - - // $exists = Db::connect()->table('ck_device_wechat_login') - // ->where('deviceId', $item['deviceId']) - // ->where('wechatId', $item['wechatId']) - // ->where('createTime', $item['createTime']) - // ->find(); - - // if (!$exists) { - // Db::connect()->table('ck_device_wechat_login')->insert($item); - // } - // } catch (\Exception $e) { - // \think\facade\Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString()); - // continue; // 跳过这条出错的记录,继续处理下一条 - // } - // } - // } catch (\Exception $e) { - // \think\facade\Log::error("处理批次数据时出错: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); - // // 不抛出异常,让程序继续处理下一批次数据 - // } - // }); - // } catch (\Exception $e) { - // \think\facade\Log::error("微信好友同步任务异常1: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); - // // 可以选择重新抛出异常或者返回false - // return false; - // } - - // return true; - // } public function syncWechatDeviceLoginLog() { try { - // 确保使用正确的表名,不要让框架自动添加前缀 - $cursor = Db::connect()->table('s2_wechat_account') + $cursor = Db::table('s2_wechat_account') ->alias('a') ->join(['s2_device' => 'd'], 'd.imei = a.imei') ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') ->cursor(); + // $insertData = []; + // $batchSize = 500; // Insert in batches for better performance + foreach ($cursor as $item) { - try { - // 检查所有必要字段是否存在,如果不存在则设置默认值 - if ( - !isset($item['deviceId']) || !isset($item['wechatId']) || - !isset($item['alive']) || !isset($item['companyId']) || - !isset($item['createTime']) - ) { - Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item)); + if (empty($item['deviceId']) || empty($item['wechatId'])) { + continue; + } - // 为缺失字段设置默认值 - $item['deviceId'] = $item['deviceId'] ?? ''; - $item['wechatId'] = $item['wechatId'] ?? ''; - $item['alive'] = $item['alive'] ?? 0; - $item['companyId'] = $item['companyId'] ?? 0; - $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s'); + // $exists = Db::connect()->table('ck_device_wechat_login') + $exists = Db::table('ck_device_wechat_login') + ->where('deviceId', $item['deviceId']) + ->where('wechatId', $item['wechatId']) + // ->where('createTime', $item['createTime']) + ->find(); - // 如果关键字段仍然为空,则跳过此条记录 - if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { - continue; - } - } - - $exists = Db::connect()->table('ck_device_wechat_login') + if ($exists) { + Db::table('ck_device_wechat_login') ->where('deviceId', $item['deviceId']) ->where('wechatId', $item['wechatId']) - ->where('createTime', $item['createTime']) - ->find(); - - if (!$exists) { - Db::connect()->table('ck_device_wechat_login')->insert($item); - } - } catch (\Exception $e) { - Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString()); - continue; // 跳过这条出错的记录,继续处理下一条 + ->update(['alive' => $item['alive']]); + } else { + Db::table('ck_device_wechat_login')->insert($item); } + + // $insertData[] = $item; + + // if (count($insertData) >= $batchSize) { + // Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE + // $insertData = []; // Reset for next batch + // } + } + // Insert any remaining data + // if (!empty($insertData)) { + // Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE + // } + return true; } catch (\Exception $e) { Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());