Files
cunkebao_v3/Moncter/app/controller/DataCollectionTaskController.php

851 lines
33 KiB
PHP
Raw Permalink Normal View History

2026-01-05 10:16:20 +08:00
<?php
namespace app\controller;
use app\service\DataCollectionTaskService;
use app\utils\ApiResponseHelper;
use support\Request;
use support\Response;
/**
* 数据采集任务管理控制器
*
* 提供任务创建、管理、进度查询等接口
*/
class DataCollectionTaskController
{
/**
* 获取任务服务实例
*/
private function getService(): DataCollectionTaskService
{
return new DataCollectionTaskService(
new \app\repository\DataCollectionTaskRepository()
);
}
/**
* 创建采集任务
*
* POST /api/data-collection-tasks
*/
public function create(Request $request): Response
{
try {
$data = $request->post();
// 验证必填字段
$requiredFields = ['name', 'data_source_id', 'database', 'target_type'];
foreach ($requiredFields as $field) {
if (empty($data[$field])) {
return ApiResponseHelper::error("缺少必填字段: {$field}", 400);
}
}
// 验证目标类型
if (!in_array($data['target_type'], ['consumption_record', 'generic'])) {
return ApiResponseHelper::error("目标类型必须是 consumption_record 或 generic", 400);
}
// 如果是通用Handler需要目标数据源配置后端会自动处理consumption_record的配置
if ($data['target_type'] === 'generic') {
$genericRequiredFields = ['target_data_source_id', 'target_database', 'target_collection'];
foreach ($genericRequiredFields as $field) {
if (empty($data[$field])) {
return ApiResponseHelper::error("通用Handler缺少必填字段: {$field}", 400);
}
}
}
// 验证模式
if (isset($data['mode']) && !in_array($data['mode'], ['batch', 'realtime'])) {
return ApiResponseHelper::error("模式必须是 batch 或 realtime", 400);
}
// 验证集合配置
if (empty($data['collection']) && empty($data['collections'])) {
return ApiResponseHelper::error("必须指定 collection 或 collections", 400);
}
$service = $this->getService();
$task = $service->createTask($data);
return ApiResponseHelper::success($task, '任务创建成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 更新任务
*
* PUT /api/data-collection-tasks/{task_id}
*/
public function update(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$data = $request->post();
$service = $this->getService();
$result = $service->updateTask($taskId, $data);
if ($result) {
return ApiResponseHelper::success(null, '任务更新成功');
} else {
return ApiResponseHelper::error('任务更新失败', 500);
}
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 删除任务
*
* DELETE /api/data-collection-tasks/{task_id}
*/
public function delete(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$result = $service->deleteTask($taskId);
if ($result) {
return ApiResponseHelper::success(null, '任务删除成功');
} else {
return ApiResponseHelper::error('任务删除失败', 500);
}
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 启动任务
*
* POST /api/data-collection-tasks/{task_id}/start
*/
public function start(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)/start#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$result = $service->startTask($taskId);
if ($result) {
return ApiResponseHelper::success(null, '任务启动成功');
} else {
return ApiResponseHelper::error('任务启动失败', 500);
}
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 暂停任务
*
* POST /api/data-collection-tasks/{task_id}/pause
*/
public function pause(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)/pause#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$result = $service->pauseTask($taskId);
if ($result) {
return ApiResponseHelper::success(null, '任务暂停成功');
} else {
return ApiResponseHelper::error('任务暂停失败', 500);
}
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 停止任务
*
* POST /api/data-collection-tasks/{task_id}/stop
*/
public function stop(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)/stop#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$result = $service->stopTask($taskId);
if ($result) {
return ApiResponseHelper::success(null, '任务停止成功');
} else {
return ApiResponseHelper::error('任务停止失败', 500);
}
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取任务列表
*
* GET /api/data-collection-tasks
*/
public function list(Request $request): Response
{
try {
// 只收集非空的筛选条件
$filters = [];
if ($request->get('status') !== null && $request->get('status') !== '') {
$filters['status'] = $request->get('status');
}
if ($request->get('data_source_id') !== null && $request->get('data_source_id') !== '') {
$filters['data_source_id'] = $request->get('data_source_id');
}
if ($request->get('name') !== null && $request->get('name') !== '') {
$filters['name'] = $request->get('name');
}
$page = (int)($request->get('page', 1));
$pageSize = (int)($request->get('page_size', 20));
$service = $this->getService();
$result = $service->getTaskList($filters, $page, $pageSize);
return ApiResponseHelper::success($result, '查询成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取任务详情
*
* GET /api/data-collection-tasks/{task_id}
*/
public function detail(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)$#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$task = $service->getTask($taskId);
if ($task === null) {
return ApiResponseHelper::error('任务不存在', 404);
}
return ApiResponseHelper::success($task, '查询成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取任务进度
*
* GET /api/data-collection-tasks/{task_id}/progress
*/
public function progress(Request $request): Response
{
try {
// 从请求路径中解析 task_id
$path = $request->path();
if (preg_match('#/api/data-collection-tasks/([^/]+)/progress#', $path, $matches)) {
$taskId = $matches[1];
} else {
$taskId = $request->get('task_id');
if (!$taskId) {
throw new \InvalidArgumentException('缺少 task_id 参数');
}
}
$service = $this->getService();
$task = $service->getTask($taskId);
if ($task === null) {
return ApiResponseHelper::error('任务不存在', 404);
}
$progress = $task['progress'] ?? [];
return ApiResponseHelper::success($progress, '查询成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取数据源列表
*
* GET /api/data-collection-tasks/data-sources
*/
public function getDataSources(Request $request): Response
{
try {
// 优先使用数据库中的数据源,如果没有则使用配置文件
$service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository());
$result = $service->getDataSourceList(['status' => 1]);
if (!empty($result['list'])) {
// 使用数据库中的数据源
$list = array_map(function ($ds) {
return [
'id' => $ds['data_source_id'],
'name' => $ds['name'] ?? $ds['data_source_id'], // 添加名称字段
'type' => $ds['type'] ?? 'unknown',
'host' => $ds['host'] ?? '',
'port' => $ds['port'] ?? 0,
'database' => $ds['database'] ?? '',
];
}, $result['list']);
}
// 注意现在数据源配置统一从数据库读取不再使用config('data_sources')
// 如果数据库中没有数据源,返回空列表
if (!isset($list)) {
$list = [];
}
return ApiResponseHelper::success($list, '查询成功');
} catch (\MongoDB\Driver\Exception\Exception $e) {
// MongoDB 连接错误,返回友好提示
$errorMessage = '无法连接到 MongoDB 数据库,请检查数据库服务是否正常运行。错误详情:' . $e->getMessage();
return ApiResponseHelper::error($errorMessage, 500);
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取数据源的数据库列表
*
* GET /api/data-collection-tasks/data-sources/{data_source_id}/databases
*/
public function getDatabases(Request $request, string $data_source_id): Response
{
try {
// 从数据库获取数据源配置
$service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository());
$dataSourceConfig = $service->getDataSourceConfig($data_source_id);
if (!$dataSourceConfig) {
return ApiResponseHelper::error('数据源不存在', 404);
}
$dataSource = $dataSourceConfig;
// 如果是MongoDB连接并获取数据库列表
if ($dataSource['type'] === 'mongodb') {
$client = $this->getMongoClient($dataSource);
$databases = $client->listDatabases();
$list = [];
foreach ($databases as $database) {
$dbName = $database->getName();
// 同时返回原始名称和base64编码的IDURL友好
$list[] = [
'name' => $dbName,
'id' => base64_encode($dbName), // URL友好的标识符
];
}
return ApiResponseHelper::success($list, '查询成功');
}
return ApiResponseHelper::error('不支持的数据源类型', 400);
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取数据库的集合列表
*
* GET /api/data-collection-tasks/data-sources/{data_source_id}/databases/{database}/collections
*/
public function getCollections(Request $request, string $data_source_id, string $database): Response
{
try {
// 解码数据库名称支持base64编码和URL编码
$database = $this->decodeName($database);
// 从数据库获取数据源配置
$service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository());
$dataSourceConfig = $service->getDataSourceConfig($data_source_id);
if (!$dataSourceConfig) {
return ApiResponseHelper::error('数据源不存在', 404);
}
$dataSource = $dataSourceConfig;
// 如果是MongoDB连接并获取集合列表
if ($dataSource['type'] === 'mongodb') {
$client = $this->getMongoClient($dataSource);
$db = $client->selectDatabase($database);
$collections = $db->listCollections();
$list = [];
foreach ($collections as $collection) {
$collName = $collection->getName();
// 同时返回原始名称和base64编码的IDURL友好
$list[] = [
'name' => $collName,
'id' => base64_encode($collName), // URL友好的标识符
];
}
return ApiResponseHelper::success($list, '查询成功');
}
return ApiResponseHelper::error('不支持的数据源类型', 400);
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取Handler的目标字段列表
*
* GET /api/data-collection-tasks/handlers/{handler_type}/target-fields
*/
public function getHandlerTargetFields(Request $request, string $handler_type): Response
{
try {
$fields = [];
switch ($handler_type) {
case 'consumption_record':
// 消费记录Handler的目标字段列表
// 包含原始输入字段(推荐)和转换后字段(可选)
// Handler会自动进行转换phone_number/id_card -> user_id, store_name -> store_id
$fields = [
// 用户标识字段(原始输入,推荐使用)
['name' => 'phone_number', 'label' => '手机号', 'type' => 'string', 'required' => false, 'description' => '手机号Handler会自动解析为user_id', 'is_original' => true],
['name' => 'id_card', 'label' => '身份证', 'type' => 'string', 'required' => false, 'description' => '身份证号Handler会自动解析为user_id', 'is_original' => true],
// 用户ID转换后字段由Handler自动生成不需要映射
['name' => 'user_id', 'label' => '用户ID', 'type' => 'string', 'required' => false, 'description' => '用户ID由Handler通过phone_number/id_card自动解析生成无需映射', 'is_original' => false, 'no_mapping' => true],
// 门店标识字段(原始输入,推荐使用)
['name' => 'store_name', 'label' => '门店名称', 'type' => 'string', 'required' => false, 'description' => '门店名称Handler会自动转换为store_id', 'is_original' => true],
// 门店ID转换后字段由Handler自动生成不需要映射
['name' => 'store_id', 'label' => '门店ID', 'type' => 'string', 'required' => false, 'description' => '门店ID由Handler通过store_name自动转换生成无需映射', 'is_original' => false, 'no_mapping' => true],
// 订单标识字段(用于去重)
['name' => 'source_order_id', 'label' => '原始订单ID', 'type' => 'string', 'required' => false, 'description' => '原始订单ID配合店铺名称做去重唯一标识建议配置', 'is_original' => true],
// 注意order_no 由系统自动生成(自动递增),不需要映射
// 金额和时间字段(直接字段)
['name' => 'amount', 'label' => '消费金额', 'type' => 'float', 'required' => true, 'description' => '消费金额(必填)', 'is_original' => true],
['name' => 'actual_amount', 'label' => '实际金额', 'type' => 'float', 'required' => true, 'description' => '实际支付金额(必填)', 'is_original' => true],
['name' => 'consume_time', 'label' => '消费时间', 'type' => 'datetime', 'required' => true, 'description' => '消费时间,用于时间分片存储(必填)', 'is_original' => true],
// 其他可选字段
['name' => 'currency', 'label' => '币种', 'type' => 'string', 'required' => false, 'description' => '币种默认CNY人民币', 'is_original' => true, 'fixed_options' => true, 'options' => [['value' => 'CNY', 'label' => '人民币(CNY)'], ['value' => 'USD', 'label' => '美元(USD)']], 'default_value' => 'CNY'],
['name' => 'status', 'label' => '记录状态', 'type' => 'int', 'required' => false, 'description' => '记录状态0-正常1-异常2-已删除。默认0。需要配置源状态值到标准状态值的映射', 'is_original' => true, 'value_mapping' => true, 'target_values' => [['value' => 0, 'label' => '正常(0)'], ['value' => 1, 'label' => '异常(1)'], ['value' => 2, 'label' => '已删除(2)']], 'default_value' => 0],
];
break;
case 'generic':
// 通用Handler - 没有固定的字段列表,由用户自定义
$fields = [];
break;
default:
return ApiResponseHelper::error("未知的Handler类型: {$handler_type}", 400);
}
return ApiResponseHelper::success($fields, '查询成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 获取集合的字段列表(采样)
*
* GET /api/data-collection-tasks/data-sources/{data_source_id}/databases/{database}/collections/{collection}/fields
*/
public function getFields(Request $request, string $data_source_id, string $database, string $collection): Response
{
try {
// 解码数据库名称和集合名称支持base64编码和URL编码
$database = $this->decodeName($database);
$collection = $this->decodeName($collection);
// 从数据库获取数据源配置
$service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository());
$dataSourceConfig = $service->getDataSourceConfig($data_source_id);
if (!$dataSourceConfig) {
return ApiResponseHelper::error('数据源不存在', 404);
}
$dataSource = $dataSourceConfig;
// 如果是MongoDB采样获取字段
if ($dataSource['type'] === 'mongodb') {
$client = $this->getMongoClient($dataSource);
$db = $client->selectDatabase($database);
$coll = $db->selectCollection($collection);
// 采样一条数据
$sample = $coll->findOne([]);
if ($sample) {
$fields = [];
$this->extractFields($sample, '', $fields);
return ApiResponseHelper::success($fields, '查询成功');
} else {
return ApiResponseHelper::success([], '集合为空,无法获取字段');
}
}
return ApiResponseHelper::error('不支持的数据源类型', 400);
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 递归提取字段
*/
private function extractFields($data, string $prefix, array &$fields): void
{
if (is_array($data) || is_object($data)) {
foreach ($data as $key => $value) {
$fieldName = $prefix ? "{$prefix}.{$key}" : $key;
if (is_array($value) || is_object($value)) {
if (empty($value)) {
$fields[] = [
'name' => $fieldName,
'type' => 'array',
];
} else {
$this->extractFields($value, $fieldName, $fields);
}
} else {
$fields[] = [
'name' => $fieldName,
'type' => gettype($value),
];
}
}
}
}
/**
* 预览查询结果包含lookup
*
* POST /api/data-collection-tasks/preview-query
*/
public function previewQuery(Request $request): Response
{
try {
$data = $request->post();
$dataSourceId = $data['data_source_id'] ?? '';
$database = $data['database'] ?? '';
$collection = $data['collection'] ?? '';
$lookups = $data['lookups'] ?? [];
$filterConditions = $data['filter_conditions'] ?? [];
$limit = (int)($data['limit'] ?? 5); // 默认预览5条
if (empty($dataSourceId) || empty($database) || empty($collection)) {
return ApiResponseHelper::error('缺少必要参数data_source_id, database, collection', 400);
}
// 获取数据源配置
$service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository());
$dataSourceConfig = $service->getDataSourceConfig($dataSourceId);
if (!$dataSourceConfig) {
return ApiResponseHelper::error('数据源不存在', 404);
}
if ($dataSourceConfig['type'] !== 'mongodb') {
return ApiResponseHelper::error('目前只支持MongoDB数据源预览', 400);
}
// 连接MongoDB
$client = $this->getMongoClient($dataSourceConfig);
$db = $client->selectDatabase($database);
$coll = $db->selectCollection($collection);
// 构建聚合管道
$pipeline = [];
// 1. 添加过滤条件($match- 必须在最前面
$filter = $this->buildFilterForPreview($filterConditions);
if (!empty($filter)) {
$pipeline[] = ['$match' => $filter];
}
// 2. 添加lookup查询
foreach ($lookups as $lookup) {
if (empty($lookup['from']) || empty($lookup['local_field']) || empty($lookup['foreign_field'])) {
continue;
}
$lookupStage = [
'$lookup' => [
'from' => $lookup['from'],
'localField' => $lookup['local_field'],
'foreignField' => $lookup['foreign_field'],
'as' => $lookup['as'] ?? 'joined'
]
];
$pipeline[] = $lookupStage;
// 如果配置了解构
if (!empty($lookup['unwrap'])) {
$pipeline[] = [
'$unwind' => [
'path' => '$' . ($lookup['as'] ?? 'joined'),
'preserveNullAndEmptyArrays' => !empty($lookup['preserve_null'])
]
];
}
}
// 3. 限制返回数量
$pipeline[] = ['$limit' => $limit];
// 执行聚合查询
$cursor = $coll->aggregate($pipeline);
$results = [];
$fields = [];
foreach ($cursor as $doc) {
$docArray = $this->convertMongoDocumentToArray($doc);
$results[] = $docArray;
// 提取字段
$this->extractFields($docArray, '', $fields);
}
// 去重字段
$uniqueFields = [];
$fieldMap = [];
foreach ($fields as $field) {
if (!isset($fieldMap[$field['name']])) {
$fieldMap[$field['name']] = true;
$uniqueFields[] = $field;
}
}
return ApiResponseHelper::success([
'fields' => $uniqueFields,
'data' => $results,
'count' => count($results)
], '预览成功');
} catch (\Throwable $e) {
return ApiResponseHelper::exception($e);
}
}
/**
* 将MongoDB文档转换为数组
*/
private function convertMongoDocumentToArray($document): array
{
if (is_array($document)) {
return $document;
}
if (is_object($document)) {
$array = [];
foreach ($document as $key => $value) {
if ($value instanceof \MongoDB\BSON\UTCDateTime) {
$array[$key] = $value->toDateTime()->format('Y-m-d H:i:s');
} elseif (is_object($value) && method_exists($value, '__toString')) {
$array[$key] = (string)$value;
} elseif (is_array($value) || is_object($value)) {
$array[$key] = $this->convertMongoDocumentToArray($value);
} else {
$array[$key] = $value;
}
}
return $array;
}
return [];
}
/**
* 构建过滤条件(用于预览查询)
*
* @param array $filterConditions 过滤条件列表
* @return array MongoDB查询过滤器
*/
private function buildFilterForPreview(array $filterConditions): array
{
$filter = [];
foreach ($filterConditions as $condition) {
$field = $condition['field'] ?? '';
$operator = $condition['operator'] ?? 'eq';
$value = $condition['value'] ?? null;
if (empty($field)) {
continue;
}
// 处理值的类型转换
if ($value !== null && $value !== '') {
// 尝试转换为数字(如果是数字字符串)
if (is_numeric($value)) {
// 判断是整数还是浮点数
if (strpos($value, '.') !== false) {
$value = (float)$value;
} else {
$value = (int)$value;
}
}
}
switch ($operator) {
case 'eq':
$filter[$field] = $value;
break;
case 'ne':
$filter[$field] = ['$ne' => $value];
break;
case 'gt':
$filter[$field] = ['$gt' => $value];
break;
case 'gte':
$filter[$field] = ['$gte' => $value];
break;
case 'lt':
$filter[$field] = ['$lt' => $value];
break;
case 'lte':
$filter[$field] = ['$lte' => $value];
break;
case 'in':
// in操作符的值应该是数组
$valueArray = is_array($value) ? $value : explode(',', (string)$value);
$filter[$field] = ['$in' => $valueArray];
break;
case 'nin':
// nin操作符的值应该是数组
$valueArray = is_array($value) ? $value : explode(',', (string)$value);
$filter[$field] = ['$nin' => $valueArray];
break;
}
}
return $filter;
}
/**
* 解码数据库或集合名称支持base64编码和URL编码
*
* @param string $name 编码后的名称
* @return string 解码后的名称
*/
private function decodeName(string $name): string
{
// 尝试base64解码如果前端使用的是编码后的ID
// 检查是否可能是base64编码只包含base64字符且长度合理
if (preg_match('/^[A-Za-z0-9+\/]*={0,2}$/', $name) && strlen($name) > 0) {
$decoded = @base64_decode($name, true);
if ($decoded !== false && $decoded !== '') {
// 解码成功,使用解码后的值
return $decoded;
}
}
// 不是base64格式或解码失败使用URL解码处理中文等特殊字符
return rawurldecode($name);
}
/**
* 获取MongoDB客户端
*/
private function getMongoClient(array $config): \MongoDB\Client
{
$host = $config['host'] ?? '';
$port = (int)($config['port'] ?? 27017);
$username = $config['username'] ?? '';
$password = $config['password'] ?? '';
$authSource = $config['auth_source'] ?? 'admin';
if (!empty($username) && !empty($password)) {
$dsn = "mongodb://{$username}:{$password}@{$host}:{$port}/{$authSource}";
} else {
$dsn = "mongodb://{$host}:{$port}";
}
return new \MongoDB\Client($dsn, $config['options'] ?? []);
}
}