fix sync wechat login

This commit is contained in:
xavier
2025-05-14 11:19:19 +08:00
parent bd5d2570df
commit 2d60f23806
2 changed files with 94 additions and 101 deletions

View File

@@ -43,10 +43,14 @@ class SyncWechatDataToCkbTask extends Command
file_put_contents($this->lockFile, time()); file_put_contents($this->lockFile, time());
try { try {
$output->writeln("同步任务 sync_wechat_to_ckb 开始");
$ChuKeBaoAdapter = new ChuKeBaoAdapter(); $ChuKeBaoAdapter = new ChuKeBaoAdapter();
$this->syncWechatAccount($ChuKeBaoAdapter); $this->syncWechatAccount($ChuKeBaoAdapter);
$this->syncWechatFriend($ChuKeBaoAdapter); $this->syncWechatFriend($ChuKeBaoAdapter);
$this->syncWechatDeviceLoginLog($ChuKeBaoAdapter); $this->syncWechatDeviceLoginLog($ChuKeBaoAdapter);
$output->writeln("同步任务 sync_wechat_to_ckb 已结束");
return true; return true;
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error('微信好友同步任务异常:' . $e->getMessage()); Log::error('微信好友同步任务异常:' . $e->getMessage());

View File

@@ -141,8 +141,10 @@ class Adapter implements WeChatServiceInterface
return true; return true;
} }
/* todo 以上方法待实现,基于/参考 application/api/controller/WebSocketController.php 去实现 */ /* todo 以上方法待实现,基于/参考 application/api/controller/WebSocketController.php 去实现 */
// NOTE: run in background; 5min 同步一次 // NOTE: run in background; 5min 同步一次
// todo: 后续经过`s2_`表直接对接三方的api去sync // todo: 后续经过`s2_`表直接对接三方的api去sync
public function syncFriendship() public function syncFriendship()
@@ -179,11 +181,41 @@ class Adapter implements WeChatServiceInterface
public function syncWechatAccount() public function syncWechatAccount()
{ {
$sql = "INSERT INTO ck_wechat_account(wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime) $pk = 'wechatId';
$limit = 1000;
// $lastId = '';
$lastId = null; // Or some other sentinel indicating "first run"
$totalAffected = 0;
$iterations = 0;
$maxIterations = 10000;
do {
// Fetch a batch of distinct wechatIds
// Important: Order by wechatId for consistent pagination
$sourceDb = Db::connect()->table('s2_wechat_friend');
// if ($lastId !== '') { // For subsequent iterations
if (!is_null($lastId)) { // Check if it's not the first iteration
$sourceDb->where($pk, '>', $lastId);
}
$distinctWechatIds = $sourceDb->order($pk, 'ASC')
->distinct(true)
->limit($limit)
->column($pk); // Get an array of wechatIds
if (empty($distinctWechatIds)) {
break; // No more wechatIds to process
}
// Prepare the main IODKU query for this batch of wechatIds
$sql = "INSERT INTO ck_wechat_account(wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime)
SELECT SELECT
wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime wechatId,alias,nickname,pyInitial,quanPin,avatar,gender,region,signature,phone,country,privince,city,createTime,updateTime
FROM FROM
s2_wechat_friend GROUP BY wechatId s2_wechat_friend
WHERE wechatId IN (" . implode(',', array_fill(0, count($distinctWechatIds), '?')) . ")
GROUP BY wechatId -- Grouping within the selected wechatIds
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
alias=VALUES(alias), alias=VALUES(alias),
nickname=VALUES(nickname), nickname=VALUES(nickname),
@@ -197,127 +229,84 @@ class Adapter implements WeChatServiceInterface
country=VALUES(country), country=VALUES(country),
privince=VALUES(privince), privince=VALUES(privince),
city=VALUES(city), city=VALUES(city),
updateTime=VALUES(updateTime);"; updateTime=VALUES(updateTime)";
$affected = Db::execute($sql); // The parameters for the IN clause are the distinctWechatIds themselves
return $affected; $bindings = $distinctWechatIds;
try {
$affected = Db::execute($sql, $bindings);
$totalAffected += $affected;
// Log::info("syncWechatAccount: Processed batch of " . count($distinctWechatIds) . " distinct wechatIds. Affected rows: " . $affected);
// Update lastId for the next iteration
$lastId = end($distinctWechatIds);
if ($affected > 0) {
usleep(50000);
}
} catch (\Exception $e) {
Log::error("syncWechatAccount batch error: " . $e->getMessage() . " with wechatIds starting around " . $distinctWechatIds[0] . ". SQL: " . $sql . " Bindings: " . json_encode($bindings));
// Decide if you want to break or continue with the next batch
break; // Example: break on error
}
$iterations++;
} while (count($distinctWechatIds) === $limit && $iterations < $maxIterations); // Continue if we fetched a full batch
// Log::info("syncWechatAccount finished. Total affected rows: " . $totalAffected);
return $totalAffected;
} }
// syncWechatDeviceLoginLog
// public function syncWechatDeviceLoginLog()
// {
// try {
// // 确保使用正确的表名,不要让框架自动添加前缀
// Db::connect()->table('s2_wechat_account')
// ->alias('a')
// ->join(['s2_device' => 'd'], 'd.imei = a.imei')
// ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId')
// ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
// ->chunk(1000, function ($data) {
// try {
// foreach ($data as $item) {
// Log::info("syncWechatDeviceLoginLog: " . json_encode($item));
// try {
// // 检查所有必要字段是否存在,如果不存在则设置默认值
// // if (!isset($item['deviceId']) || !isset($item['wechatId']) ||
// // !isset($item['alive']) || !isset($item['companyId']) ||
// // !isset($item['createTime'])) {
// // \think\facade\Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item));
// // // 为缺失字段设置默认值
// // $item['deviceId'] = $item['deviceId'] ?? '';
// // $item['wechatId'] = $item['wechatId'] ?? '';
// // $item['alive'] = $item['alive'] ?? 0;
// // $item['companyId'] = $item['companyId'] ?? 0;
// // $item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s');
// // // 如果关键字段仍然为空,则跳过此条记录
// // if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) {
// // continue;
// // }
// // }
// if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) {
// continue;
// }
// $exists = Db::connect()->table('ck_device_wechat_login')
// ->where('deviceId', $item['deviceId'])
// ->where('wechatId', $item['wechatId'])
// ->where('createTime', $item['createTime'])
// ->find();
// if (!$exists) {
// Db::connect()->table('ck_device_wechat_login')->insert($item);
// }
// } catch (\Exception $e) {
// \think\facade\Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString());
// continue; // 跳过这条出错的记录,继续处理下一条
// }
// }
// } catch (\Exception $e) {
// \think\facade\Log::error("处理批次数据时出错: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
// // 不抛出异常,让程序继续处理下一批次数据
// }
// });
// } catch (\Exception $e) {
// \think\facade\Log::error("微信好友同步任务异常1: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());
// // 可以选择重新抛出异常或者返回false
// return false;
// }
// return true;
// }
public function syncWechatDeviceLoginLog() public function syncWechatDeviceLoginLog()
{ {
try { try {
// 确保使用正确的表名,不要让框架自动添加前缀 $cursor = Db::table('s2_wechat_account')
$cursor = Db::connect()->table('s2_wechat_account')
->alias('a') ->alias('a')
->join(['s2_device' => 'd'], 'd.imei = a.imei') ->join(['s2_device' => 'd'], 'd.imei = a.imei')
->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId') ->join(['s2_company_account' => 'c'], 'c.id = d.currentAccountId')
->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime') ->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
->cursor(); ->cursor();
// $insertData = [];
// $batchSize = 500; // Insert in batches for better performance
foreach ($cursor as $item) { foreach ($cursor as $item) {
try {
// 检查所有必要字段是否存在,如果不存在则设置默认值
if (
!isset($item['deviceId']) || !isset($item['wechatId']) ||
!isset($item['alive']) || !isset($item['companyId']) ||
!isset($item['createTime'])
) {
Log::warning("Missing required field in syncWechatDeviceLoginLog: " . json_encode($item)); if (empty($item['deviceId']) || empty($item['wechatId'])) {
continue;
}
// 为缺失字段设置默认值 // $exists = Db::connect()->table('ck_device_wechat_login')
$item['deviceId'] = $item['deviceId'] ?? ''; $exists = Db::table('ck_device_wechat_login')
$item['wechatId'] = $item['wechatId'] ?? ''; ->where('deviceId', $item['deviceId'])
$item['alive'] = $item['alive'] ?? 0; ->where('wechatId', $item['wechatId'])
$item['companyId'] = $item['companyId'] ?? 0; // ->where('createTime', $item['createTime'])
$item['createTime'] = $item['createTime'] ?? date('Y-m-d H:i:s'); ->find();
// 如果关键字段仍然为空,则跳过此条记录 if ($exists) {
if (empty($item['deviceId']) || empty($item['wechatId']) || empty($item['createTime'])) { Db::table('ck_device_wechat_login')
continue;
}
}
$exists = Db::connect()->table('ck_device_wechat_login')
->where('deviceId', $item['deviceId']) ->where('deviceId', $item['deviceId'])
->where('wechatId', $item['wechatId']) ->where('wechatId', $item['wechatId'])
->where('createTime', $item['createTime']) ->update(['alive' => $item['alive']]);
->find(); } else {
Db::table('ck_device_wechat_login')->insert($item);
if (!$exists) {
Db::connect()->table('ck_device_wechat_login')->insert($item);
}
} catch (\Exception $e) {
Log::error("处理单条数据时出错: " . $e->getMessage() . ", 数据: " . json_encode($item) . ", 堆栈: " . $e->getTraceAsString());
continue; // 跳过这条出错的记录,继续处理下一条
} }
// $insertData[] = $item;
// if (count($insertData) >= $batchSize) {
// Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE
// $insertData = []; // Reset for next batch
// }
} }
// Insert any remaining data
// if (!empty($insertData)) {
// Db::connect()->table('ck_device_wechat_login')->insertAll($insertData, true); // true for INSERT IGNORE
// }
return true; return true;
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString()); Log::error("微信好友同步任务异常: " . $e->getMessage() . ", 堆栈: " . $e->getTraceAsString());