Files
cunkebao_v3/Moncter/app/service/DataSource/Strategy/MongoDBConsumptionStrategy.php

226 lines
7.6 KiB
PHP
Raw Permalink Normal View History

2026-01-05 10:16:20 +08:00
<?php
namespace app\service\DataSource\Strategy;
use app\service\DataSource\DataSourceAdapterInterface;
use app\service\DataSource\PollingStrategyInterface;
use app\utils\LoggerHelper;
/**
* MongoDB 消费记录轮询策略
*
* 职责:
* - 提供 MongoDB 专用的轮询策略实现
* - 展示如何实现自定义业务逻辑
*/
class MongoDBConsumptionStrategy implements PollingStrategyInterface
{
/**
* 执行轮询查询
*
* @param DataSourceAdapterInterface $adapter 数据源适配器
* @param array<string, mixed> $config 数据源配置
* @param array<string, mixed> $lastSyncInfo 上次同步信息
* @return array<array<string, mixed>> 查询结果数组
*/
public function poll(
DataSourceAdapterInterface $adapter,
array $config,
array $lastSyncInfo = []
): array {
// 从配置中获取集合名和查询条件
$collectionName = $config['collection'] ?? 'consumption_records';
$lastSyncTime = $lastSyncInfo['last_sync_time'] ?? null;
$lastSyncId = $lastSyncInfo['last_sync_id'] ?? null;
// 构建 MongoDB 查询过滤器
$filter = [];
// 如果有上次同步时间,只查询新增或更新的记录
if ($lastSyncTime !== null) {
$lastSyncTimestamp = is_numeric($lastSyncTime) ? (int)$lastSyncTime : strtotime($lastSyncTime);
$lastSyncDate = new \MongoDB\BSON\UTCDateTime($lastSyncTimestamp * 1000);
$filter['$or'] = [
['created_at' => ['$gt' => $lastSyncDate]],
['updated_at' => ['$gt' => $lastSyncDate]],
];
}
// 如果有上次同步ID用于去重可选
if ($lastSyncId !== null) {
$filter['_id'] = ['$gt' => $lastSyncId];
}
// 查询选项
$options = [
'sort' => ['created_at' => 1, '_id' => 1], // 按创建时间和ID排序
];
// 执行查询批量查询每次最多1000条
$limit = $config['batch_size'] ?? 1000;
$offset = 0;
$allResults = [];
do {
// MongoDB 适配器的 queryBatch 方法签名queryBatch(string $sql, array $params = [], int $offset = 0, int $limit = 1000)
// 对于 MongoDB$sql 是集合名,$params 包含 'filter' 和 'options'
$results = $adapter->queryBatch($collectionName, [
'filter' => $filter,
'options' => $options,
], $offset, $limit);
if (empty($results)) {
break;
}
$allResults = array_merge($allResults, $results);
$offset += $limit;
// 防止无限循环最多查询10万条
if (count($allResults) >= 100000) {
LoggerHelper::logBusiness('polling_batch_limit_reached', [
'collection' => $collectionName,
'count' => count($allResults),
]);
break;
}
} while (count($results) === $limit);
LoggerHelper::logBusiness('polling_query_completed', [
'collection' => $collectionName,
'result_count' => count($allResults),
'last_sync_time' => $lastSyncTime,
]);
return $allResults;
}
/**
* 数据转换
*
* @param array<array<string, mixed>> $rawData 原始数据
* @param array<string, mixed> $config 数据源配置
* @return array<array<string, mixed>> 转换后的数据
*/
public function transform(array $rawData, array $config): array
{
// 字段映射配置(从外部数据库字段映射到标准字段)
$fieldMapping = $config['field_mapping'] ?? [
// 默认映射MongoDB 使用 _id需要转换为 id
'_id' => 'id',
'user_id' => 'user_id',
'amount' => 'amount',
'store_id' => 'store_id',
'product_id' => 'product_id',
'consume_time' => 'consume_time',
'created_at' => 'created_at',
];
$transformedData = [];
foreach ($rawData as $record) {
$transformed = [];
// 处理 MongoDB 的 _id 字段(转换为字符串)
if (isset($record['_id'])) {
if (is_object($record['_id']) && method_exists($record['_id'], '__toString')) {
$record['id'] = (string)$record['_id'];
} else {
$record['id'] = (string)$record['_id'];
}
}
// 处理 MongoDB 的日期字段UTCDateTime 转换为字符串)
foreach (['created_at', 'updated_at', 'consume_time'] as $dateField) {
if (isset($record[$dateField])) {
if (is_object($record[$dateField]) && method_exists($record[$dateField], 'toDateTime')) {
$record[$dateField] = $record[$dateField]->toDateTime()->format('Y-m-d H:i:s');
} elseif (is_object($record[$dateField]) && method_exists($record[$dateField], '__toString')) {
$record[$dateField] = (string)$record[$dateField];
}
}
}
// 应用字段映射
foreach ($fieldMapping as $standardField => $sourceField) {
if (isset($record[$sourceField])) {
$transformed[$standardField] = $record[$sourceField];
}
}
// 确保必要字段存在
if (!empty($transformed)) {
$transformedData[] = $transformed;
}
}
LoggerHelper::logBusiness('polling_transform_completed', [
'input_count' => count($rawData),
'output_count' => count($transformedData),
]);
return $transformedData;
}
/**
* 数据验证
*
* @param array<string, mixed> $record 单条记录
* @param array<string, mixed> $config 数据源配置
* @return bool 是否通过验证
*/
public function validate(array $record, array $config): bool
{
// 必填字段验证
$requiredFields = $config['required_fields'] ?? ['user_id', 'amount', 'consume_time'];
foreach ($requiredFields as $field) {
if (!isset($record[$field]) || $record[$field] === null || $record[$field] === '') {
LoggerHelper::logBusiness('polling_validation_failed', [
'reason' => "缺少必填字段: {$field}",
'record' => $record,
]);
return false;
}
}
// 金额验证(必须为正数)
if (isset($record['amount'])) {
$amount = (float)$record['amount'];
if ($amount <= 0) {
LoggerHelper::logBusiness('polling_validation_failed', [
'reason' => '金额必须大于0',
'amount' => $amount,
]);
return false;
}
}
// 时间格式验证(可选)
if (isset($record['consume_time'])) {
$time = strtotime($record['consume_time']);
if ($time === false) {
LoggerHelper::logBusiness('polling_validation_failed', [
'reason' => '时间格式无效',
'consume_time' => $record['consume_time'],
]);
return false;
}
}
return true;
}
/**
* 获取策略名称
*
* @return string 策略名称
*/
public function getName(): string
{
return 'mongodb_consumption';
}
}