fix sync err

This commit is contained in:
xavier
2025-05-13 14:38:51 +08:00
parent 0e947401dc
commit 957d197245
2 changed files with 172 additions and 2 deletions

View File

@@ -6,12 +6,27 @@ use think\facade\Log;
use think\console\Input;
use think\console\Output;
use think\console\Command;
use think\facade\App;
use WeChatDeviceApi\Adapters\ChuKeBao\Adapter as ChuKeBaoAdapter;
// */7 * * * * cd /www/wwwroot/mckb_quwanzhi_com/Server && php think sync:wechatData >> /www/wwwroot/mckb_quwanzhi_com/Server/runtime/log/sync_wechat_data.log 2>&1
class SyncWechatDataToCkbTask extends Command
{
protected $lockFile = RUNTIME_PATH . 'sync_wechat_to_ckb.lock';
protected $lockFile;
public function __construct()
{
parent::__construct();
$this->lockFile = App::getRuntimePath() . 'sync_wechat_to_ckb.lock';
}
protected function configure()
{
$this->setName('sync:wechatData')
->setDescription('同步微信数据到存客宝');
}
protected function execute(Input $input, Output $output)
{
@@ -31,6 +46,7 @@ class SyncWechatDataToCkbTask extends Command
$ChuKeBaoAdapter = new ChuKeBaoAdapter();
$this->syncWechatAccount($ChuKeBaoAdapter);
$this->syncWechatFriend($ChuKeBaoAdapter);
$this->syncWechatDeviceLoginLog($ChuKeBaoAdapter);
return true;
} catch (\Exception $e) {
Log::error('微信好友同步任务异常:' . $e->getMessage());
@@ -51,4 +67,9 @@ class SyncWechatDataToCkbTask extends Command
{
return $ChuKeBaoAdapter->syncWechatAccount();
}
protected function syncWechatDeviceLoginLog(ChuKeBaoAdapter $ChuKeBaoAdapter)
{
return $ChuKeBaoAdapter->syncWechatDeviceLoginLog();
}
}

View File

@@ -160,7 +160,8 @@ class Adapter implements WeChatServiceInterface
$offset = 0;
$limit = 2000;
$usleepTime = 100000;
// $usleepTime = 100000;
$usleepTime = 50000;
do {
$affected = Db::execute($sql, [$offset, $limit]);
@@ -197,4 +198,152 @@ class Adapter implements WeChatServiceInterface
$affected = Db::execute($sql);
return $affected;
}
// syncWechatDeviceLoginLog
public function syncWechatDeviceLoginLog()
{
Db::table('s2_wechat_account')
->alias('a')
->join('s2_device d', 'd.imei = a.imei')
->join('s2_company_account c', 'c.id = d.currentAccountId')
->field('d.id as deviceId, a.wechatId, a.wechatAlive as alive, c.departmentId as companyId, a.updateTime as createTime')
->chunk(1000, function ($data) {
foreach ($data as $item) {
$exists = Db::table('ck_device_wechat_login')
->where('deviceId', $item['deviceId'])
->where('wechatId', $item['wechatId'])
->where('createTime', $item['createTime'])
->find();
if (!$exists) {
Db::table('ck_device_wechat_login')->insert($item);
}
}
});
}
/**
* 大数据量分批处理版本
* 适用于数据源非常大的情况,避免一次性加载全部数据到内存
* 独立脚本执行30min 同步一次 和 流量来源的更新一起
*
* @param int $batchSize 每批处理的数据量
* @return int 影响的行数
*/
// public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000)
// {
// // 1. 获取数据总量
// $total = Db::table('s2_wechat_friend')
// ->group('wechatId')
// ->count();
// // 2. 计算需要处理的批次
// $batchCount = ceil($total / $batchSize);
// $affectedRows = 0;
// // 3. 分批处理
// for ($i = 0; $i < $batchCount; $i++) {
// $offset = $i * $batchSize;
// // 分批查询SQL使用LIMIT控制每次获取的数据量
// $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
// SELECT wechatId identifier, wechatId, phone
// FROM (
// SELECT wechatId, phone
// FROM `s2_wechat_friend`
// GROUP BY wechatId
// LIMIT {$offset}, {$batchSize}
// ) AS temp";
// $affectedRows += Db::execute($sql);
// // 释放内存
// if ($i % 5 == 0) {
// Db::clear();
// gc_collect_cycles();
// }
// }
// return $affectedRows;
// }
public function syncWechatFriendToTrafficPoolBatch($batchSize = 5000)
{
// 1. 先获取去重后的wechatId清单并建立临时索引
Db::execute("CREATE TEMPORARY TABLE IF NOT EXISTS temp_wechat_ids (
wechatId VARCHAR(64) PRIMARY KEY
) ENGINE=MEMORY");
Db::execute("TRUNCATE TABLE temp_wechat_ids");
// 批量插入去重的wechatId
Db::execute("INSERT INTO temp_wechat_ids SELECT DISTINCT wechatId FROM s2_wechat_friend");
// 2. 获取临时表的数据总量
$total = Db::table('temp_wechat_ids')->count();
// 3. 计算需要处理的批次
$batchCount = ceil($total / $batchSize);
$affectedRows = 0;
// 4. 开始事务处理批量数据
try {
for ($i = 0; $i < $batchCount; $i++) {
$offset = $i * $batchSize;
// 使用临时表和JOIN来提高查询效率
// $sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
// SELECT t.wechatId AS identifier, t.wechatId, f.phone AS mobile
// FROM (
// SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize}
// ) AS t
// LEFT JOIN s2_wechat_friend f ON t.wechatId = f.wechatId
// GROUP BY t.wechatId";
$sql = "INSERT IGNORE INTO ck_traffic_pool(`identifier`, `wechatId`, `mobile`)
SELECT t.wechatId AS identifier, t.wechatId,
(SELECT phone FROM s2_wechat_friend
WHERE wechatId = t.wechatId LIMIT 1) AS mobile
FROM (
SELECT wechatId FROM temp_wechat_ids LIMIT {$offset}, {$batchSize}
) AS t";
$currentAffected = Db::execute($sql);
$affectedRows += $currentAffected;
// 更频繁地释放内存
// if ($i % 2 == 1) {
if ($i % 5 == 0) {
// Db::clear();
gc_collect_cycles();
}
// 添加短暂休眠,但不要太长以免影响总体执行时间
// 20-50毫秒通常是个不错的平衡点
usleep(30000); // 30毫秒
// 如果批处理大小很大或者当前批影响行数很多,可以适当增加休眠时间
// if ($currentAffected > 1000) {
// usleep(20000); // 额外增加20毫秒
// }
// 输出进度日志
// $progress = round(($i + 1) / $batchCount * 100, 2);
// \think\facade\Log::info("Traffic pool sync progress: {$progress}% completed. Rows affected in this batch: {$currentAffected}");
}
} catch (\Exception $e) {
\think\facade\Log::error("Error in traffic pool sync: " . $e->getMessage());
// 删除临时表
// Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
throw $e;
} finally {
Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
}
// 5. 清理临时表
// Db::execute("DROP TEMPORARY TABLE IF EXISTS temp_wechat_ids");
return $affectedRows;
}
}