使用pcntl_fork支持多进程点赞

This commit is contained in:
wong
2025-05-21 11:22:25 +08:00
parent 61bcbf2790
commit 9df664ecee

View File

@@ -21,7 +21,7 @@ use app\api\controller\WechatFriendController;
class WorkbenchJob
{
/************************************
* 常量定义与核心队列处理
* 常量定义
************************************/
/**
@@ -37,6 +37,10 @@ class WorkbenchJob
*/
const MAX_RETRY_ATTEMPTS = 3;
/************************************
* 核心队列处理
************************************/
/**
* 队列任务处理
* @param Job $job 队列任务
@@ -62,59 +66,9 @@ class WorkbenchJob
return $this->handleJobError($e, $job, $queueLockKey);
}
}
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function handleJobSuccess($job, $queueLockKey)
{
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台任务执行成功');
}
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
/************************************
* 工作台处理核心逻辑
* 工作台基础功能
************************************/
/**
@@ -129,18 +83,6 @@ class WorkbenchJob
])->order('id DESC')->select();
}
/**
* 处理空工作台情况
* @param Job $job
* @param string $queueLockKey
*/
protected function handleEmptyWorkbenches(Job $job, $queueLockKey)
{
Log::info('没有需要处理的工作台任务');
$job->delete();
Cache::rm($queueLockKey);
}
/**
* 处理工作台列表
* @param \think\Collection $workbenches
@@ -227,18 +169,19 @@ class WorkbenchJob
*/
protected function handleAutoLike($workbench, $config)
{
if (!$this->validateAutoLikeConfig($workbench, $config)) {
return;
}
// 验证是否在点赞时间范围内
if (!$this->isWithinLikeTimeRange($config)) {
return;
}
// 处理分页获取好友列表
$this->processAllFriends($workbench, $config);
}
/**
* 处理所有好友分页
* @param Workbench $workbench
@@ -252,81 +195,111 @@ class WorkbenchJob
if (empty($friendList)) {
return;
}
foreach ($friendList as $friend) {
// 验证是否达到点赞次数上限
$likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']);
if ($likeCount >= $config['maxLikes']) {
Log::info("工作台 {$workbench->id} 点赞次数已达上限");
return;
}
// 验证是否达到好友点赞次数上限
$friendMaxLikes = Db::name('workbench_auto_like_item')
->where('workbenchId', $workbench->id)
->where('wechatFriendId', $friend['friendId'])
->count();
if ($friendMaxLikes < $config['friendMaxLikes']) {
$this->processFriendMoments($workbench, $config, $friend);
// 将好友列表分成20组
$friendGroups = array_chunk($friendList, 20);
$processes = [];
foreach ($friendGroups as $groupIndex => $friendGroup) {
// 创建子进程
$pid = pcntl_fork();
if ($pid == -1) {
// 创建进程失败
Log::error("工作台 {$workbench->id} 创建进程失败");
continue;
} else if ($pid) {
// 父进程
$processes[] = $pid;
} else {
// 子进程
try {
foreach ($friendGroup as $friend) {
// 验证是否达到点赞次数上限
$likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']);
if ($likeCount >= $config['maxLikes']) {
Log::info("工作台 {$workbench->id} 点赞次数已达上限");
continue;
}
// 验证是否达到好友点赞次数上限
$friendMaxLikes = Db::name('workbench_auto_like_item')
->where('workbenchId', $workbench->id)
->where('wechatFriendId', $friend['friendId'])
->count();
if ($friendMaxLikes < $config['friendMaxLikes']) {
$this->processFriendMoments($workbench, $config, $friend);
}
}
} catch (\Exception $e) {
Log::error("工作台 {$workbench->id} 子进程异常: " . $e->getMessage());
}
// 子进程执行完毕后退出
exit(0);
}
}
// 等待所有子进程完成
foreach ($processes as $pid) {
pcntl_waitpid($pid, $status);
}
// 如果当前页数据量等于页大小,说明可能还有更多数据,继续处理下一页
if (count($friendList) == $pageSize) {
$this->processAllFriends($workbench, $config, $page + 1, $pageSize);
}
}
/**
* 验证自动点赞配置
* @param Workbench $workbench
* @param WorkbenchAutoLike $config
* @return bool
*/
protected function validateAutoLikeConfig($workbench, $config)
{
$requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime'];
foreach ($requiredFields as $field) {
if (empty($config[$field])) {
Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空");
return false;
}
}
return true;
}
/**
* 获取今日点赞次数
* @param Workbench $workbench
* @param WorkbenchAutoLike $config
* @return int
* 获取好友列表
* @param WorkbenchAutoLike $config 配置
* @param int $page 页码
* @param int $pageSize 每页大小
* @return array
*/
protected function getTodayLikeCount($workbench, $config, $deviceId)
protected function getFriendList($config, $page = 1, $pageSize = 100)
{
return Db::name('workbench_auto_like_item')
->where('workbenchId', $workbench->id)
->where('deviceId', $deviceId)
->whereTime('createTime', 'between', [
strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'),
strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00')
$friends = json_decode($config['friends'], true);
$devices = json_decode($config['devices'], true);
$list = Db::table('s2_company_account')
->alias('ca')
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id')
->join('workbench_auto_like_item wali', 'wali.wechatFriendId = wf.id AND wali.workbenchId = ' . $config['workbenchId'], 'left')
->where([
'ca.status' => 0,
'wf.isDeleted' => 0,
'wa.deviceAlive' => 1,
'wa.wechatAlive' => 1
])
->count();
->whereIn('wa.currentDeviceId', $devices)
->field([
'ca.id as accountId',
'ca.userName',
'wf.id as friendId',
'wf.wechatId',
'wf.wechatAccountId',
'wa.wechatId as wechatAccountWechatId',
'wa.currentDeviceId as deviceId',
'COUNT(wali.id) as like_count'
]);
if (!empty($friends) && is_array($friends) && count($friends) > 0) {
$list = $list->whereIn('wf.id', $friends);
}
$list = $list->group('wf.wechatId')
->having('like_count < ' . $config['friendMaxLikes'])
->order('wf.id DESC')
->page($page, $pageSize)
->select();
return $list;
}
/**
* 检查是否在点赞时间范围内
* @param WorkbenchAutoLike $config
* @return bool
*/
protected function isWithinLikeTimeRange($config)
{
$currentTime = date('H:i');
if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) {
Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})");
return false;
}
return true;
}
/**
* 处理好友朋友圈
* @param Workbench $workbench
@@ -385,28 +358,6 @@ class WorkbenchJob
}
}
/**
* 获取好友标签
* @param int $friendId
* @return array
*/
protected function getFriendLabels($friend)
{
// 获取好友标签
$wechatFriendController = new WechatFriendController();
$result = $wechatFriendController->getlist([ 'friendKeyword' => $friend['wechatId'],'wechatAccountKeyword' => $friend['wechatAccountWechatId']],true);
$result = json_decode($result, true);
$labels = [];
if(!empty($result['data'])){
foreach($result['data'] as $item){
$labels = array_merge($labels, $item['labels']);
}
}
return $labels;
}
/**
* 获取未点赞的朋友圈
* @param int $friendId
@@ -480,6 +431,82 @@ class WorkbenchJob
Log::info("工作台 {$workbench->id} 点赞成功: {$moment['snsId']}");
}
/**
* 获取好友标签
* @param array $friend
* @return array
*/
protected function getFriendLabels($friend)
{
$wechatFriendController = new WechatFriendController();
$result = $wechatFriendController->getlist([
'friendKeyword' => $friend['wechatId'],
'wechatAccountKeyword' => $friend['wechatAccountWechatId']
], true);
$result = json_decode($result, true);
$labels = [];
if(!empty($result['data'])){
foreach($result['data'] as $item){
$labels = array_merge($labels, $item['labels']);
}
}
return $labels;
}
/**
* 验证自动点赞配置
* @param Workbench $workbench
* @param WorkbenchAutoLike $config
* @return bool
*/
protected function validateAutoLikeConfig($workbench, $config)
{
$requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime'];
foreach ($requiredFields as $field) {
if (empty($config[$field])) {
Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空");
return false;
}
}
return true;
}
/**
* 获取今日点赞次数
* @param Workbench $workbench
* @param WorkbenchAutoLike $config
* @return int
*/
protected function getTodayLikeCount($workbench, $config, $deviceId)
{
return Db::name('workbench_auto_like_item')
->where('workbenchId', $workbench->id)
->where('deviceId', $deviceId)
->whereTime('createTime', 'between', [
strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'),
strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00')
])
->count();
}
/**
* 检查是否在点赞时间范围内
* @param WorkbenchAutoLike $config
* @return bool
*/
protected function isWithinLikeTimeRange($config)
{
$currentTime = date('H:i');
if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) {
Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})");
return false;
}
return true;
}
/************************************
* 朋友圈同步功能
************************************/
@@ -524,53 +551,70 @@ class WorkbenchJob
// TODO: 实现自动建群逻辑
Log::info("处理自动建群任务: {$workbench->id}");
}
/************************************
* 辅助方法
* 任务处理辅助方法
************************************/
/**
* 记录任务开始
* @param string $jobId
* @param string $queueLockKey
*/
protected function logJobStart($jobId, $queueLockKey)
{
Log::info('开始处理工作台任务: ' . json_encode([
'jobId' => $jobId,
'queueLockKey' => $queueLockKey
]));
}
/**
* 获取好友列表
* @param WorkbenchAutoLike $config 配置
* @param int $page 页码
* @param int $pageSize 每页大小
* @return array
* 处理任务成功
* @param Job $job
* @param string $queueLockKey
*/
protected function getFriendList($config, $page = 1, $pageSize = 100)
protected function handleJobSuccess($job, $queueLockKey)
{
$friends = json_decode($config['friends'], true);
$devices = json_decode($config['devices'], true);
$job->delete();
Cache::rm($queueLockKey);
Log::info('工作台任务执行成功');
}
$list = Db::table('s2_company_account')
->alias('ca')
->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId')
->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id')
->where([
'ca.status' => 0,
'wf.isDeleted' => 0,
'wa.deviceAlive' => 1,
'wa.wechatAlive' => 1
])
->whereIn('wa.currentDeviceId', $devices)
->field([
'ca.id as accountId',
'ca.userName',
'wf.id as friendId',
'wf.wechatId',
'wf.wechatAccountId',
'wa.wechatId as wechatAccountWechatId',
'wa.currentDeviceId as deviceId'
]);
if (!empty($friends) && is_array($friends) && count($friends) > 0) {
$list = $list->whereIn('wf.id', $friends);
/**
* 处理任务错误
* @param \Exception $e
* @param Job $job
* @param string $queueLockKey
* @return bool
*/
protected function handleJobError(\Exception $e, $job, $queueLockKey)
{
Log::error('工作台任务异常:' . $e->getMessage());
if (!empty($queueLockKey)) {
Cache::rm($queueLockKey);
Log::info("由于异常释放队列锁: {$queueLockKey}");
}
$list = $list->group('wf.wechatId')
->order('wf.id DESC')
->page($page, $pageSize)
->select();
return $list;
if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) {
$job->delete();
} else {
$job->release(Config::get('queue.failed_delay', 10));
}
return false;
}
/**
* 处理空工作台情况
* @param Job $job
* @param string $queueLockKey
*/
protected function handleEmptyWorkbenches(Job $job, $queueLockKey)
{
Log::info('没有需要处理的工作台任务');
$job->delete();
Cache::rm($queueLockKey);
}
}