diff --git a/Server/application/command/SyncWechatDataToCkbTask.php b/Server/application/command/SyncWechatDataToCkbTask.php index e2032119..4db6b661 100644 --- a/Server/application/command/SyncWechatDataToCkbTask.php +++ b/Server/application/command/SyncWechatDataToCkbTask.php @@ -6,12 +6,27 @@ use think\facade\Log; use think\console\Input; use think\console\Output; use think\console\Command; +use think\facade\App; use WeChatDeviceApi\Adapters\ChuKeBao\Adapter as ChuKeBaoAdapter; // */7 * * * * cd /www/wwwroot/mckb_quwanzhi_com/Server && php think sync:wechatData >> /www/wwwroot/mckb_quwanzhi_com/Server/runtime/log/sync_wechat_data.log 2>&1 class SyncWechatDataToCkbTask extends Command { - protected $lockFile = RUNTIME_PATH . 'sync_wechat_to_ckb.lock'; + protected $lockFile; + + public function __construct() + { + parent::__construct(); + $this->lockFile = App::getRuntimePath() . 'sync_wechat_to_ckb.lock'; + } + + + protected function configure() + { + $this->setName('sync:wechatData') + ->setDescription('同步微信数据到存客宝'); + } + protected function execute(Input $input, Output $output) { @@ -31,6 +46,7 @@ class SyncWechatDataToCkbTask extends Command $ChuKeBaoAdapter = new ChuKeBaoAdapter(); $this->syncWechatAccount($ChuKeBaoAdapter); $this->syncWechatFriend($ChuKeBaoAdapter); + $this->syncWechatDeviceLoginLog($ChuKeBaoAdapter); return true; } catch (\Exception $e) { Log::error('微信好友同步任务异常:' . $e->getMessage()); @@ -51,4 +67,9 @@ class SyncWechatDataToCkbTask extends Command { return $ChuKeBaoAdapter->syncWechatAccount(); } + + protected function syncWechatDeviceLoginLog(ChuKeBaoAdapter $ChuKeBaoAdapter) + { + return $ChuKeBaoAdapter->syncWechatDeviceLoginLog(); + } } \ No newline at end of file diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index 21a262a7..00d82c76 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -160,7 +160,8 @@ class Adapter implements WeChatServiceInterface $offset = 0; $limit = 2000; - $usleepTime = 100000; + // $usleepTime = 100000; + $usleepTime = 50000; do { $affected = Db::execute($sql, [$offset, $limit]); @@ -197,4 +198,152 @@ class Adapter implements WeChatServiceInterface $affected = Db::execute($sql); return $affected; } + + // syncWechatDeviceLoginLog + public function syncWechatDeviceLoginLog() + { + Db::table('s2_wechat_account') + ->alias('a') + ->join('s2_device d', 'd.imei = a.imei') + ->join('s2_company_account c', 'c.id = d.currentAccountId') + ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') + ->chunk(1000, function ($data) { + foreach ($data as $item) { + $exists = Db::table('ck_device_wechat_login') + ->where('deviceId', $item['deviceId']) + ->where('wechatId', $item['wechatId']) + ->where('createTime', $item['createTime']) + ->find(); + + if (!$exists) { + Db::table('ck_device_wechat_login')->insert($item); + } + } + }); + } + + /** + * 大数据量分批处理版本 + * 适用于数据源非常大的情况,避免一次性加载全部数据到内存 + * 独立脚本执行,30min 同步一次 和 流量来源的更新一起 + * + * @param int $batchSize 每批处理的数据量 + * @return int 影响的行数 + */ + // public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000) + // { + // // 1. 获取数据总量 + // $total = Db::table('s2_wechat_friend') + // ->group('wechatId') + // ->count(); + + // // 2. 计算需要处理的批次 + // $batchCount = ceil($total / $batchSize); + // $affectedRows = 0; + + // // 3. 分批处理 + // for ($i = 0; $i < $batchCount; $i++) { + // $offset = $i * $batchSize; + + // // 分批查询SQL,使用LIMIT控制每次获取的数据量 + // $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) + // SELECT wechatId identifier, wechatId, phone + // FROM ( + // SELECT wechatId, phone + // FROM `s2_wechat_friend` + // GROUP BY wechatId + // LIMIT {$offset}, {$batchSize} + // ) AS temp"; + + // $affectedRows += Db::execute($sql); + + // // 释放内存 + // if ($i % 5 == 0) { + // Db::clear(); + // gc_collect_cycles(); + // } + // } + + // return $affectedRows; + // } + + + + public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000) + { + // 1. 先获取去重后的wechatId清单并建立临时索引 + Db::execute("CREATE TEMPORARY TABLE IF NOT EXISTS temp_wechat_ids ( + wechatId VARCHAR(64) PRIMARY KEY + ) ENGINE=MEMORY"); + + Db::execute("TRUNCATE TABLE temp_wechat_ids"); + + // 批量插入去重的wechatId + Db::execute("INSERT INTO temp_wechat_ids SELECT DISTINCT wechatId FROM s2_wechat_friend"); + + // 2. 获取临时表的数据总量 + $total = Db::table('temp_wechat_ids')->count(); + + // 3. 计算需要处理的批次 + $batchCount = ceil($total / $batchSize); + $affectedRows = 0; + + // 4. 开始事务处理批量数据 + try { + for ($i = 0; $i < $batchCount; $i++) { + $offset = $i * $batchSize; + + // 使用临时表和JOIN来提高查询效率 + // $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) + // SELECT t.wechatId AS identifier, t.wechatId, f.phone AS mobile + // FROM ( + // SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize} + // ) AS t + // LEFT JOIN s2_wechat_friend f ON t.wechatId = f.wechatId + // GROUP BY t.wechatId"; + $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`) + SELECT t.wechatId AS identifier, t.wechatId, + (SELECT phone FROM s2_wechat_friend + WHERE wechatId = t.wechatId LIMIT 1) AS mobile + FROM ( + SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize} + ) AS t"; + + $currentAffected = Db::execute($sql); + $affectedRows += $currentAffected; + + // 更频繁地释放内存 + // if ($i % 2 == 1) { + if ($i % 5 == 0) { + // Db::clear(); + gc_collect_cycles(); + } + + // 添加短暂休眠,但不要太长以免影响总体执行时间 + // 20-50毫秒通常是个不错的平衡点 + usleep(30000); // 30毫秒 + + // 如果批处理大小很大或者当前批影响行数很多,可以适当增加休眠时间 + // if ($currentAffected > 1000) { + // usleep(20000); // 额外增加20毫秒 + // } + + // 输出进度日志 + // $progress = round(($i + 1) / $batchCount * 100, 2); + // \think\facade\Log::info("Traffic pool sync progress: {$progress}% completed. Rows affected in this batch: {$currentAffected}"); + } + } catch (\Exception $e) { + \think\facade\Log::error("Error in traffic pool sync: " . $e->getMessage()); + // 删除临时表 + // Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); + throw $e; + } finally { + Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); + } + + // 5. 清理临时表 + // Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids"); + + return $affectedRows; + } }