Files
cunkebao_v3/Server/application/job/WechatMomentsJob.php

148 lines
5.9 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\job;
use think\queue\Job;
use think\facade\Log;
use think\facade\Cache;
use think\Db;
use app\command\WechatMomentsCommand;
use app\api\controller\WebSocketController;
use think\facade\Env;
use app\api\controller\AutomaticAssign;
class WechatMomentsJob
{
protected $maxPages = 10; // 最大页数
public function fire(Job $job, $data)
{
$toAccountId = '';
$username = Env::get('api.username', '');
$password = Env::get('api.password', '');
if (!empty($username) || !empty($password)) {
$toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId');
}else{
Log::error("没有账号配置");
return;
}
try {
$jobId = $data['jobId'] ?? '';
$queueLockKey = $data['queueLockKey'] ?? '';
Log::info("开始处理朋友圈采集任务任务ID{$jobId}");
// 获取好友列表
$friends = $this->getFriends($data['pageIndex'], $data['pageSize']);
if (empty($friends)) {
Log::info("没有更多好友数据,任务完成");
Cache::rm($queueLockKey);
$job->delete();
return;
}
foreach ($friends as $friend) {
try {
// 执行切换好友命令
$automaticAssign = new AutomaticAssign();
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $toAccountId], true);
//记录切换好友
$cacheKey = 'allotWechatFriend';
$cacheFriend = $friend;
$cacheFriend['time'] = time() + 120;
$maxRetry = 5;
$retry = 0;
do {
$cacheFriendData = Cache::get($cacheKey, []);
// 去重移除同friendId的旧数据
$cacheFriendData = array_filter($cacheFriendData, function($item) use ($cacheFriend) {
return $item['friendId'] !== $cacheFriend['friendId'];
});
$cacheFriendData[] = $cacheFriend;
$success = Cache::set($cacheKey, $cacheFriendData);
$retry++;
} while (!$success && $retry < $maxRetry);
// 执行采集朋友圈命令
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
$webSocket->getMoments(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId']]);
// 处理完毕切换回原账号
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
} catch (\Exception $e) {
// 发生异常时也要切换回原账号
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
Log::error("采集好友 {$friend['id']} 的朋友圈失败:" . $e->getMessage());
continue;
}
}
// 判断是否需要继续翻页
if (count($friends) < $data['pageSize']) {
// 如果返回的数据少于页面大小,说明已经没有更多数据了
Log::info("朋友圈采集任务完成,没有更多数据");
Cache::rm($queueLockKey);
$job->delete();
} else {
// 还有更多数据,继续处理下一页
$data['pageIndex']++;
if ($data['pageIndex'] > $this->maxPages) {
Log::info("已达到最大页数限制 {$this->maxPages},任务完成");
Cache::rm($data['pageIndexCacheKey']);
Cache::rm($queueLockKey);
$job->delete();
} else {
// 处理下一页
Cache::set($data['pageIndexCacheKey'], $data['pageIndex']);
// 有下一页,将下一页任务添加到队列
$command = new WechatMomentsCommand();
$command->addToQueue($data['pageIndex'], $data['pageSize'], $jobId, $queueLockKey);
}
}
} catch (\Exception $e) {
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
Log::error("朋友圈采集任务异常:" . $e->getMessage());
Cache::rm($queueLockKey);
$job->delete();
}
}
/**
* 获取账号的好友列表
* @param int $accountId 账号ID
* @return array
*/
private function getFriends($page = 1 ,$pageSize = 100)
{
$list = Db::table('s2_company_account')
->alias('ca')
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id')
->where([
'ca.status' => 0,
'wf.isDeleted' => 0,
'wa.deviceAlive' => 1,
'wa.wechatAlive' => 1
])
->field([
'ca.id as accountId',
'ca.userName',
'wf.id as friendId',
'wf.wechatId',
'wf.wechatAccountId',
'wa.wechatId as wechatAccountWechatId',
'wa.currentDeviceId as deviceId'
])->group('wf.wechatId')
->order('wf.id DESC')
->page($page, $pageSize)
->select();
return $list;
}
}