【操盘手】 自动同步朋友圈队列
This commit is contained in:
@@ -27,6 +27,7 @@ return [
|
||||
'allotrule:autocreate' => 'app\command\AutoCreateAllotRulesCommand', // 自动创建分配规则 √
|
||||
'content:collect' => 'app\command\ContentCollectCommand', // 内容采集任务 √
|
||||
'moments:collect' => 'app\command\WechatMomentsCommand', // 朋友圈采集任务
|
||||
'workbench:run' => 'app\command\WorkbenchCommand', // 工作台任务
|
||||
'workbench:autoLike' => 'app\command\WorkbenchAutoLikeCommand', // 工作台自动点赞任务
|
||||
'workbench:moments' => 'app\command\WorkbenchMomentsCommand', // 工作台朋友圈同步任务
|
||||
'sync:wechatData' => 'app\command\SyncWechatDataToCkbTask', // 同步微信数据到存客宝
|
||||
];
|
||||
|
||||
@@ -536,11 +536,6 @@ class ContentLibraryController extends Controller
|
||||
$item->urls = json_encode($param['urls'] ?? [],256);
|
||||
$item->senderNickname = '系统创建';
|
||||
$item->coverImage = $param['coverImage'] ?? '';
|
||||
|
||||
|
||||
print_r($item);
|
||||
exit;
|
||||
|
||||
$item->save();
|
||||
|
||||
return json(['code' => 200, 'msg' => '添加成功', 'data' => ['id' => $item->id]]);
|
||||
|
||||
500
Server/application/job/WorkbenchAutoLikeJob.php
Normal file
500
Server/application/job/WorkbenchAutoLikeJob.php
Normal file
@@ -0,0 +1,500 @@
|
||||
<?php
|
||||
|
||||
namespace app\job;
|
||||
|
||||
use think\queue\Job;
|
||||
use think\facade\Log;
|
||||
use think\Queue;
|
||||
use think\facade\Config;
|
||||
use think\facade\Cache;
|
||||
use think\facade\Env;
|
||||
use app\cunkebao\model\Workbench;
|
||||
use app\cunkebao\model\WorkbenchAutoLike;
|
||||
use think\Db;
|
||||
use app\api\controller\WebSocketController;
|
||||
use app\api\controller\AutomaticAssign;
|
||||
use app\api\controller\WechatFriendController;
|
||||
|
||||
class WorkbenchAutoLikeJob
|
||||
{
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
const MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* 队列任务处理
|
||||
* @param Job $job 队列任务
|
||||
* @param array $data 任务数据
|
||||
* @return bool
|
||||
*/
|
||||
public function fire(Job $job, $data)
|
||||
{
|
||||
$jobId = $data['jobId'] ?? '';
|
||||
$queueLockKey = $data['queueLockKey'] ?? '';
|
||||
try {
|
||||
$this->logJobStart($jobId, $queueLockKey);
|
||||
$workbenches = $this->getActiveWorkbenches();
|
||||
if (empty($workbenches)) {
|
||||
$this->handleEmptyWorkbenches($job, $queueLockKey);
|
||||
return true;
|
||||
}
|
||||
$this->processWorkbenches($workbenches);
|
||||
$this->handleJobSuccess($job, $queueLockKey);
|
||||
return true;
|
||||
|
||||
} catch (\Exception $e) {
|
||||
return $this->handleJobError($e, $job, $queueLockKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取活跃的工作台
|
||||
* @return \think\Collection
|
||||
*/
|
||||
protected function getActiveWorkbenches()
|
||||
{
|
||||
return Workbench::where([
|
||||
['status', '=', 1],
|
||||
['isDel', '=', 0],
|
||||
['type', '=', 1] // 只获取自动点赞类型的工作台
|
||||
])->order('id DESC')->select();
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理工作台列表
|
||||
* @param \think\Collection $workbenches
|
||||
*/
|
||||
protected function processWorkbenches($workbenches)
|
||||
{
|
||||
foreach ($workbenches as $workbench) {
|
||||
try {
|
||||
$this->processSingleWorkbench($workbench);
|
||||
} catch (\Exception $e) {
|
||||
Log::error("处理工作台 {$workbench->id} 失败: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理单个工作台
|
||||
* @param Workbench $workbench
|
||||
*/
|
||||
protected function processSingleWorkbench($workbench)
|
||||
{
|
||||
$config = WorkbenchAutoLike::where('workbenchId', $workbench->id)->find();
|
||||
if (!$config) {
|
||||
Log::error("工作台 {$workbench->id} 配置获取失败");
|
||||
return;
|
||||
}
|
||||
|
||||
$this->handleAutoLike($workbench, $config);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理自动点赞任务
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
*/
|
||||
protected function handleAutoLike($workbench, $config)
|
||||
{
|
||||
if (!$this->validateAutoLikeConfig($workbench, $config)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 验证是否在点赞时间范围内
|
||||
if (!$this->isWithinLikeTimeRange($config)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 处理分页获取好友列表
|
||||
$this->processAllFriends($workbench, $config);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理所有好友分页
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @param int $page 当前页码
|
||||
* @param int $pageSize 每页大小
|
||||
*/
|
||||
protected function processAllFriends($workbench, $config, $page = 1, $pageSize = 100)
|
||||
{
|
||||
$friendList = $this->getFriendList($config, $page, $pageSize);
|
||||
if (empty($friendList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 将好友列表分成20组
|
||||
$friendGroups = array_chunk($friendList, 20);
|
||||
$processes = [];
|
||||
|
||||
foreach ($friendGroups as $groupIndex => $friendGroup) {
|
||||
// 创建子进程
|
||||
$pid = pcntl_fork();
|
||||
|
||||
if ($pid == -1) {
|
||||
// 创建进程失败
|
||||
Log::error("工作台 {$workbench->id} 创建进程失败");
|
||||
continue;
|
||||
} else if ($pid) {
|
||||
// 父进程
|
||||
$processes[] = $pid;
|
||||
} else {
|
||||
// 子进程
|
||||
try {
|
||||
foreach ($friendGroup as $friend) {
|
||||
// 验证是否达到点赞次数上限
|
||||
$likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']);
|
||||
if ($likeCount >= $config['maxLikes']) {
|
||||
Log::info("工作台 {$workbench->id} 点赞次数已达上限");
|
||||
continue;
|
||||
}
|
||||
|
||||
// 验证是否达到好友点赞次数上限
|
||||
$friendMaxLikes = Db::name('workbench_auto_like_item')
|
||||
->where('workbenchId', $workbench->id)
|
||||
->where('wechatFriendId', $friend['friendId'])
|
||||
->count();
|
||||
|
||||
if ($friendMaxLikes < $config['friendMaxLikes']) {
|
||||
$this->processFriendMoments($workbench, $config, $friend);
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error("工作台 {$workbench->id} 子进程异常: " . $e->getMessage());
|
||||
}
|
||||
|
||||
// 子进程执行完毕后退出
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
// 等待所有子进程完成
|
||||
foreach ($processes as $pid) {
|
||||
pcntl_waitpid($pid, $status);
|
||||
}
|
||||
|
||||
// 如果当前页数据量等于页大小,说明可能还有更多数据,继续处理下一页
|
||||
if (count($friendList) == $pageSize) {
|
||||
$this->processAllFriends($workbench, $config, $page + 1, $pageSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取好友列表
|
||||
* @param WorkbenchAutoLike $config 配置
|
||||
* @param int $page 页码
|
||||
* @param int $pageSize 每页大小
|
||||
* @return array
|
||||
*/
|
||||
protected function getFriendList($config, $page = 1, $pageSize = 100)
|
||||
{
|
||||
$friends = json_decode($config['friends'], true);
|
||||
$devices = json_decode($config['devices'], true);
|
||||
|
||||
$list = Db::table('s2_company_account')
|
||||
->alias('ca')
|
||||
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
|
||||
->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id')
|
||||
->join('workbench_auto_like_item wali', 'wali.wechatFriendId = wf.id AND wali.workbenchId = ' . $config['workbenchId'], 'left')
|
||||
->where([
|
||||
'ca.status' => 0,
|
||||
'wf.isDeleted' => 0,
|
||||
'wa.deviceAlive' => 1,
|
||||
'wa.wechatAlive' => 1
|
||||
])
|
||||
->whereIn('wa.currentDeviceId', $devices)
|
||||
->field([
|
||||
'ca.id as accountId',
|
||||
'ca.userName',
|
||||
'wf.id as friendId',
|
||||
'wf.wechatId',
|
||||
'wf.wechatAccountId',
|
||||
'wa.wechatId as wechatAccountWechatId',
|
||||
'wa.currentDeviceId as deviceId',
|
||||
'COUNT(wali.id) as like_count'
|
||||
]);
|
||||
|
||||
if (!empty($friends) && is_array($friends) && count($friends) > 0) {
|
||||
$list = $list->whereIn('wf.id', $friends);
|
||||
}
|
||||
|
||||
$list = $list->group('wf.wechatId')
|
||||
->having('like_count < ' . $config['friendMaxLikes'])
|
||||
->order('wf.id DESC')
|
||||
->page($page, $pageSize)
|
||||
->select();
|
||||
|
||||
return $list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理好友朋友圈
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @param array $friend
|
||||
*/
|
||||
protected function processFriendMoments($workbench, $config, $friend)
|
||||
{
|
||||
$toAccountId = '';
|
||||
$username = Env::get('api.username', '');
|
||||
$password = Env::get('api.password', '');
|
||||
if (!empty($username) || !empty($password)) {
|
||||
$toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId');
|
||||
}
|
||||
|
||||
try {
|
||||
// 执行切换好友命令
|
||||
$automaticAssign = new AutomaticAssign();
|
||||
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $toAccountId], true);
|
||||
|
||||
// 执行采集朋友圈命令
|
||||
$webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]);
|
||||
$webSocket->getMoments(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId']]);
|
||||
|
||||
// 查询未点赞的朋友圈
|
||||
$moments = $this->getUnlikedMoments($friend['friendId']);
|
||||
if (empty($moments)) {
|
||||
// 处理完毕切换回原账号
|
||||
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
|
||||
Log::info("好友 {$friend['friendId']} 没有需要点赞的朋友圈");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($moments as $moment) {
|
||||
// 点赞朋友圈
|
||||
$this->likeMoment($workbench, $config, $friend, $moment, $webSocket);
|
||||
|
||||
if(!empty($config['enableFriendTags']) && !empty($config['friendTags'])){
|
||||
// 修改好友标签
|
||||
$labels = $this->getFriendLabels($friend);
|
||||
$labels[] = $config['friendTags'];
|
||||
$webSocket->modifyFriendLabel(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId'], 'labels' => $labels]);
|
||||
}
|
||||
|
||||
// 每个好友只点赞一条朋友圈,然后退出
|
||||
break;
|
||||
}
|
||||
|
||||
// 处理完毕切换回原账号
|
||||
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
|
||||
} catch (\Exception $e) {
|
||||
// 异常情况下也要确保切换回原账号
|
||||
$automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true);
|
||||
|
||||
Log::error("处理好友 {$friend['friendId']} 朋友圈失败: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取未点赞的朋友圈
|
||||
* @param int $friendId
|
||||
* @return \think\Collection
|
||||
*/
|
||||
protected function getUnlikedMoments($friendId)
|
||||
{
|
||||
return Db::table('s2_wechat_moments')
|
||||
->alias('wm')
|
||||
->join('workbench_auto_like_item wali', 'wali.momentsId = wm.id', 'left')
|
||||
->where([
|
||||
['wm.wechatFriendId', '=', $friendId],
|
||||
['wali.id', 'null', null]
|
||||
])
|
||||
->field('wm.id, wm.snsId')
|
||||
->group('wali.wechatFriendId')
|
||||
->order('wm.createTime DESC')
|
||||
->select();
|
||||
}
|
||||
|
||||
/**
|
||||
* 点赞朋友圈
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @param array $friend
|
||||
* @param array $moment
|
||||
* @param WebSocketController $webSocket
|
||||
*/
|
||||
protected function likeMoment($workbench, $config, $friend, $moment, $webSocket)
|
||||
{
|
||||
try {
|
||||
$result = $webSocket->momentInteract([
|
||||
'snsId' => $moment['snsId'],
|
||||
'wechatAccountId' => $friend['wechatAccountId'],
|
||||
]);
|
||||
|
||||
$result = json_decode($result, true);
|
||||
|
||||
if ($result['code'] == 200) {
|
||||
$this->recordLike($workbench, $moment, $friend);
|
||||
|
||||
// 添加间隔时间
|
||||
if (!empty($config['interval'])) {
|
||||
sleep($config['interval']);
|
||||
}
|
||||
} else {
|
||||
Log::error("工作台 {$workbench->id} 点赞失败: " . ($result['msg'] ?? '未知错误'));
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error("工作台 {$workbench->id} 点赞异常: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录点赞
|
||||
* @param Workbench $workbench
|
||||
* @param array $moment
|
||||
* @param array $friend
|
||||
*/
|
||||
protected function recordLike($workbench, $moment, $friend)
|
||||
{
|
||||
Db::name('workbench_auto_like_item')->insert([
|
||||
'workbenchId' => $workbench->id,
|
||||
'deviceId' => $friend['deviceId'],
|
||||
'momentsId' => $moment['id'],
|
||||
'snsId' => $moment['snsId'],
|
||||
'wechatAccountId' => $friend['wechatAccountId'],
|
||||
'wechatFriendId' => $friend['friendId'],
|
||||
'createTime' => time()
|
||||
]);
|
||||
Log::info("工作台 {$workbench->id} 点赞成功: {$moment['snsId']}");
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取好友标签
|
||||
* @param array $friend
|
||||
* @return array
|
||||
*/
|
||||
protected function getFriendLabels($friend)
|
||||
{
|
||||
$wechatFriendController = new WechatFriendController();
|
||||
$result = $wechatFriendController->getlist([
|
||||
'friendKeyword' => $friend['wechatId'],
|
||||
'wechatAccountKeyword' => $friend['wechatAccountWechatId']
|
||||
], true);
|
||||
|
||||
$result = json_decode($result, true);
|
||||
$labels = [];
|
||||
|
||||
if(!empty($result['data'])){
|
||||
foreach($result['data'] as $item){
|
||||
$labels = array_merge($labels, $item['labels']);
|
||||
}
|
||||
}
|
||||
|
||||
return $labels;
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证自动点赞配置
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @return bool
|
||||
*/
|
||||
protected function validateAutoLikeConfig($workbench, $config)
|
||||
{
|
||||
$requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime'];
|
||||
foreach ($requiredFields as $field) {
|
||||
if (empty($config[$field])) {
|
||||
Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取今日点赞次数
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @return int
|
||||
*/
|
||||
protected function getTodayLikeCount($workbench, $config, $deviceId)
|
||||
{
|
||||
return Db::name('workbench_auto_like_item')
|
||||
->where('workbenchId', $workbench->id)
|
||||
->where('deviceId', $deviceId)
|
||||
->whereTime('createTime', 'between', [
|
||||
strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'),
|
||||
strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00')
|
||||
])
|
||||
->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否在点赞时间范围内
|
||||
* @param WorkbenchAutoLike $config
|
||||
* @return bool
|
||||
*/
|
||||
protected function isWithinLikeTimeRange($config)
|
||||
{
|
||||
$currentTime = date('H:i');
|
||||
if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) {
|
||||
Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录任务开始
|
||||
* @param string $jobId
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function logJobStart($jobId, $queueLockKey)
|
||||
{
|
||||
Log::info('开始处理工作台自动点赞任务: ' . json_encode([
|
||||
'jobId' => $jobId,
|
||||
'queueLockKey' => $queueLockKey
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务成功
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function handleJobSuccess($job, $queueLockKey)
|
||||
{
|
||||
$job->delete();
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info('工作台自动点赞任务执行成功');
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务错误
|
||||
* @param \Exception $e
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
* @return bool
|
||||
*/
|
||||
protected function handleJobError(\Exception $e, $job, $queueLockKey)
|
||||
{
|
||||
Log::error('工作台自动点赞任务异常:' . $e->getMessage());
|
||||
|
||||
if (!empty($queueLockKey)) {
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info("由于异常释放队列锁: {$queueLockKey}");
|
||||
}
|
||||
|
||||
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
|
||||
$job->delete();
|
||||
} else {
|
||||
$job->release(Config::get('queue.failed_delay', 10));
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理空工作台情况
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function handleEmptyWorkbenches(Job $job, $queueLockKey)
|
||||
{
|
||||
Log::info('没有需要处理的工作台自动点赞任务');
|
||||
$job->delete();
|
||||
Cache::rm($queueLockKey);
|
||||
}
|
||||
}
|
||||
344
Server/application/job/WorkbenchMomentsJob.php
Normal file
344
Server/application/job/WorkbenchMomentsJob.php
Normal file
@@ -0,0 +1,344 @@
|
||||
<?php
|
||||
namespace app\job;
|
||||
|
||||
use app\api\controller\WebSocketController;
|
||||
use app\cunkebao\model\Workbench;
|
||||
use app\cunkebao\model\WorkbenchMomentsSync as WorkbenchMoments;
|
||||
use app\api\model\WechatFriendModel as WechatFriend;
|
||||
use app\api\model\WechatMomentsModel as WechatMoments;
|
||||
use think\facade\Log;
|
||||
use think\facade\Env;
|
||||
use think\Db;
|
||||
use think\queue\Job;
|
||||
use think\facade\Cache;
|
||||
use think\facade\Config;
|
||||
use app\api\controller\MomentsController as Moments;
|
||||
|
||||
/**
|
||||
* 工作台朋友圈同步任务
|
||||
* Class WorkbenchMomentsJob
|
||||
* @package app\job
|
||||
*/
|
||||
class WorkbenchMomentsJob
|
||||
{
|
||||
/**
|
||||
* 内容类型映射
|
||||
* 0:未知 1:图片 2:链接 3:视频 4:文本 5:小程序 6:图文
|
||||
*/
|
||||
const CONTENT_TYPE_MAP = [
|
||||
0 => 1, // 未知 -> 文本
|
||||
1 => 2, // 图片 -> 图文
|
||||
2 => 4, // 链接 -> 链接
|
||||
3 => 3, // 视频 -> 视频
|
||||
4 => 1, // 文本 -> 文本
|
||||
5 => 1, // 小程序 -> 文本
|
||||
6 => 2, // 图文 -> 图文
|
||||
];
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
const MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* 队列任务处理
|
||||
* @param Job $job 队列任务
|
||||
* @param array $data 任务数据
|
||||
* @return bool
|
||||
*/
|
||||
public function fire(Job $job, $data)
|
||||
{
|
||||
$jobId = $data['jobId'] ?? '';
|
||||
$queueLockKey = $data['queueLockKey'] ?? '';
|
||||
try {
|
||||
$this->logJobStart($jobId, $queueLockKey);
|
||||
$this->execute();
|
||||
$this->handleJobSuccess($job, $queueLockKey);
|
||||
return true;
|
||||
} catch (\Exception $e) {
|
||||
return $this->handleJobError($e, $job, $queueLockKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function execute()
|
||||
{
|
||||
try {
|
||||
// 获取所有工作台
|
||||
$workbenches = Workbench::where(['status' => 1, 'type' => 2, 'isDel' => 0])->select();
|
||||
foreach ($workbenches as $workbench) {
|
||||
// 获取工作台配置
|
||||
$config = WorkbenchMoments::where('workbenchId', $workbench->id)->find();
|
||||
if (!$config) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取设备
|
||||
$devices = $this->getDevice($workbench, $config);
|
||||
if (empty($devices)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取内容库
|
||||
$contentLibrary = $this->getContentLibrary($workbench, $config);
|
||||
if (empty($contentLibrary)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 处理内容发送
|
||||
$this->handleContentSend($workbench, $config, $devices, $contentLibrary);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error("朋友圈同步任务异常: " . $e->getMessage());
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理内容发送
|
||||
* @param Workbench $workbench
|
||||
* @param WorkbenchMoments $config
|
||||
* @param array $devices
|
||||
* @param array $contentLibrary
|
||||
*/
|
||||
protected function handleContentSend($workbench, $config, $devices, $contentLibrary)
|
||||
{
|
||||
// 准备评论数据
|
||||
$comment = [];
|
||||
if (!empty($contentLibrary['comment'])) {
|
||||
$comment[] = $contentLibrary['comment'];
|
||||
}
|
||||
|
||||
// 准备发送数据
|
||||
$jobPublishWechatMomentsItems = [];
|
||||
foreach ($devices as $device) {
|
||||
$jobPublishWechatMomentsItems[] = [
|
||||
'comments' => $comment,
|
||||
'labels' => [],
|
||||
'wechatAccountId' => $device['wechatAccountId']
|
||||
];
|
||||
}
|
||||
|
||||
// 转换内容类型
|
||||
$momentContentType = self::CONTENT_TYPE_MAP[$contentLibrary['contentType']] ?? 1;
|
||||
$sendTime = !empty($contentLibrary['sendTime']) ? $contentLibrary['sendTime'] : time();
|
||||
// 准备发送参数
|
||||
$data = [
|
||||
'altList' => '',
|
||||
'immediately' => false,
|
||||
'isUseLocation' => false,
|
||||
'jobPublishWechatMomentsItems' => $jobPublishWechatMomentsItems,
|
||||
'lat' => 0,
|
||||
'lng' => 0,
|
||||
'link' => ['image' => ''],
|
||||
'momentContentType' => $momentContentType,
|
||||
'picUrlList' => json_decode($contentLibrary['resUrls'], true),
|
||||
'poiAddress' => '',
|
||||
'poiName' => '',
|
||||
'publicMode' => '',
|
||||
'text' => $contentLibrary['content'],
|
||||
'timingTime' => date('Y-m-d H:i:s', $sendTime),
|
||||
'beginTime' => date('Y-m-d H:i:s', $sendTime),
|
||||
'endTime' => date('Y-m-d H:i:s', $sendTime + 60),
|
||||
'videoUrl' => '',
|
||||
];
|
||||
|
||||
print_r($data);
|
||||
exit;
|
||||
|
||||
// 发送朋友圈
|
||||
$moments = new Moments();
|
||||
$moments->addJob($data);
|
||||
|
||||
// 记录发送记录
|
||||
$this->recordSendHistory($workbench, $devices, $contentLibrary);
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录发送历史
|
||||
* @param Workbench $workbench
|
||||
* @param array $devices
|
||||
* @param array $contentLibrary
|
||||
*/
|
||||
protected function recordSendHistory($workbench, $devices, $contentLibrary)
|
||||
{
|
||||
$now = time();
|
||||
$data = [];
|
||||
foreach ($devices as $device) {
|
||||
$data[] = [
|
||||
'workbenchId' => $workbench->id,
|
||||
'deviceId' => $device['deviceId'],
|
||||
'contentId' => $contentLibrary['id'],
|
||||
'libraryId' => $contentLibrary['libraryId'],
|
||||
'createTime' => $now,
|
||||
'updateTime' => $now
|
||||
];
|
||||
}
|
||||
Db::name('workbench_moments_sync_item')->insertAll($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备列表
|
||||
* @param Workbench $workbench 工作台
|
||||
* @param WorkbenchMoments $config 配置
|
||||
* @return array|bool
|
||||
*/
|
||||
protected function getDevice($workbench, $config)
|
||||
{
|
||||
$devices = json_decode($config['devices'], true);
|
||||
if (empty($devices)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$list = Db::name('device')->alias('d')
|
||||
->join('device_wechat_login dw', 'dw.alive = 1 and dw.deviceId = d.id and dw.companyId = d.companyId')
|
||||
->join(['s2_wechat_account' => 'wa'], 'wa.wechatId = dw.wechatId')
|
||||
->where(['d.companyId' => $workbench->companyId, 'd.alive' => 1])
|
||||
->whereIn('d.id', $devices)
|
||||
->field('d.id as deviceId, d.memo as deviceName, d.companyId, dw.wechatId, wa.id as wechatAccountId')
|
||||
->select();
|
||||
|
||||
$newList = [];
|
||||
foreach ($list as $val) {
|
||||
// 检查今日发送次数
|
||||
$count = Db::name('workbench_moments_sync_item')
|
||||
->where('workbenchId', $workbench->id)
|
||||
->where('deviceId', $val['deviceId'])
|
||||
->whereTime('createTime', 'between', [
|
||||
strtotime(date('Y-m-d') . '00:00:00'),
|
||||
strtotime(date('Y-m-d') . '23:59:59')
|
||||
])->count();
|
||||
|
||||
if ($count >= $config['syncCount']) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 检查发送间隔
|
||||
$prevSend = Db::name('workbench_moments_sync_item')
|
||||
->where('workbenchId', $workbench->id)
|
||||
->where('deviceId', $val['deviceId'])
|
||||
->order('createTime DESC')
|
||||
->find();
|
||||
|
||||
if (!empty($prevSend) && ($prevSend['createTime'] + $config['syncInterval'] * 60) > time()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$newList[] = $val;
|
||||
}
|
||||
|
||||
return $newList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取内容库
|
||||
* @param Workbench $workbench 工作台
|
||||
* @param WorkbenchMoments $config 配置
|
||||
* @return array|bool
|
||||
*/
|
||||
protected function getContentLibrary($workbench, $config)
|
||||
{
|
||||
$contentids = json_decode($config['contentLibraries'], true);
|
||||
if (empty($contentids)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 基础查询
|
||||
$query = Db::name('content_library')->alias('cl')
|
||||
->join('content_item ci', 'ci.libraryId = cl.id')
|
||||
->join('workbench_moments_sync_item wmsi', 'wmsi.contentId = ci.id and wmsi.workbenchId = ' . $workbench->id, 'left')
|
||||
->where(['cl.isDel' => 0, 'ci.isDel' => 0])
|
||||
->where('ci.sendTime <= ' . (time() + 60))
|
||||
->whereIn('cl.id', $contentids)
|
||||
->field([
|
||||
'ci.id',
|
||||
'ci.libraryId',
|
||||
'ci.contentType',
|
||||
'ci.title',
|
||||
'ci.content',
|
||||
'ci.resUrls',
|
||||
'ci.urls',
|
||||
'ci.comment',
|
||||
'ci.sendTime'
|
||||
]);
|
||||
|
||||
// 根据accountType处理不同的发送逻辑
|
||||
if ($config['accountType'] == 1) {
|
||||
// 可以循环发送
|
||||
// 1. 优先获取未发送的内容
|
||||
$unsentContent = $query->where('wmsi.id', 'null')
|
||||
->order('ci.sendTime desc, ci.id desc')
|
||||
->find();
|
||||
|
||||
if (!empty($unsentContent)) {
|
||||
return $unsentContent;
|
||||
}
|
||||
|
||||
// 2. 如果没有未发送的内容,则获取已发送的内容中最早发送的
|
||||
$sentContent = $query->where('wmsi.id', 'not null')
|
||||
->order('wmsi.createTime asc, ci.sendTime desc, ci.id desc')
|
||||
->find();
|
||||
|
||||
return $sentContent;
|
||||
} else {
|
||||
// 不能循环发送,只获取未发送的内容
|
||||
$list = $query->where('wmsi.id', 'null')
|
||||
->order('ci.sendTime desc, ci.id desc')
|
||||
->find();
|
||||
return $list;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录任务开始
|
||||
* @param string $jobId
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function logJobStart($jobId, $queueLockKey)
|
||||
{
|
||||
Log::info('开始处理工作台朋友圈同步任务: ' . json_encode([
|
||||
'jobId' => $jobId,
|
||||
'queueLockKey' => $queueLockKey
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务成功
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
*/
|
||||
protected function handleJobSuccess($job, $queueLockKey)
|
||||
{
|
||||
$job->delete();
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info('工作台朋友圈同步任务执行成功');
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务错误
|
||||
* @param \Exception $e
|
||||
* @param Job $job
|
||||
* @param string $queueLockKey
|
||||
* @return bool
|
||||
*/
|
||||
protected function handleJobError(\Exception $e, $job, $queueLockKey)
|
||||
{
|
||||
Log::error('工作台朋友圈同步任务异常:' . $e->getMessage());
|
||||
|
||||
if (!empty($queueLockKey)) {
|
||||
Cache::rm($queueLockKey);
|
||||
Log::info("由于异常释放队列锁: {$queueLockKey}");
|
||||
}
|
||||
|
||||
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
|
||||
$job->delete();
|
||||
} else {
|
||||
$job->release(Config::get('queue.failed_delay', 10));
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user