post(); // 验证必填字段 $requiredFields = ['name', 'data_source_id', 'database', 'target_type']; foreach ($requiredFields as $field) { if (empty($data[$field])) { return ApiResponseHelper::error("缺少必填字段: {$field}", 400); } } // 验证目标类型 if (!in_array($data['target_type'], ['consumption_record', 'generic'])) { return ApiResponseHelper::error("目标类型必须是 consumption_record 或 generic", 400); } // 如果是通用Handler,需要目标数据源配置(后端会自动处理consumption_record的配置) if ($data['target_type'] === 'generic') { $genericRequiredFields = ['target_data_source_id', 'target_database', 'target_collection']; foreach ($genericRequiredFields as $field) { if (empty($data[$field])) { return ApiResponseHelper::error("通用Handler缺少必填字段: {$field}", 400); } } } // 验证模式 if (isset($data['mode']) && !in_array($data['mode'], ['batch', 'realtime'])) { return ApiResponseHelper::error("模式必须是 batch 或 realtime", 400); } // 验证集合配置 if (empty($data['collection']) && empty($data['collections'])) { return ApiResponseHelper::error("必须指定 collection 或 collections", 400); } $service = $this->getService(); $task = $service->createTask($data); return ApiResponseHelper::success($task, '任务创建成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 更新任务 * * PUT /api/data-collection-tasks/{task_id} */ public function update(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $data = $request->post(); $service = $this->getService(); $result = $service->updateTask($taskId, $data); if ($result) { return ApiResponseHelper::success(null, '任务更新成功'); } else { return ApiResponseHelper::error('任务更新失败', 500); } } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 删除任务 * * DELETE /api/data-collection-tasks/{task_id} */ public function delete(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $result = $service->deleteTask($taskId); if ($result) { return ApiResponseHelper::success(null, '任务删除成功'); } else { return ApiResponseHelper::error('任务删除失败', 500); } } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 启动任务 * * POST /api/data-collection-tasks/{task_id}/start */ public function start(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)/start#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $result = $service->startTask($taskId); if ($result) { return ApiResponseHelper::success(null, '任务启动成功'); } else { return ApiResponseHelper::error('任务启动失败', 500); } } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 暂停任务 * * POST /api/data-collection-tasks/{task_id}/pause */ public function pause(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)/pause#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $result = $service->pauseTask($taskId); if ($result) { return ApiResponseHelper::success(null, '任务暂停成功'); } else { return ApiResponseHelper::error('任务暂停失败', 500); } } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 停止任务 * * POST /api/data-collection-tasks/{task_id}/stop */ public function stop(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)/stop#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $result = $service->stopTask($taskId); if ($result) { return ApiResponseHelper::success(null, '任务停止成功'); } else { return ApiResponseHelper::error('任务停止失败', 500); } } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取任务列表 * * GET /api/data-collection-tasks */ public function list(Request $request): Response { try { // 只收集非空的筛选条件 $filters = []; if ($request->get('status') !== null && $request->get('status') !== '') { $filters['status'] = $request->get('status'); } if ($request->get('data_source_id') !== null && $request->get('data_source_id') !== '') { $filters['data_source_id'] = $request->get('data_source_id'); } if ($request->get('name') !== null && $request->get('name') !== '') { $filters['name'] = $request->get('name'); } $page = (int)($request->get('page', 1)); $pageSize = (int)($request->get('page_size', 20)); $service = $this->getService(); $result = $service->getTaskList($filters, $page, $pageSize); return ApiResponseHelper::success($result, '查询成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取任务详情 * * GET /api/data-collection-tasks/{task_id} */ public function detail(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)$#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $task = $service->getTask($taskId); if ($task === null) { return ApiResponseHelper::error('任务不存在', 404); } return ApiResponseHelper::success($task, '查询成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取任务进度 * * GET /api/data-collection-tasks/{task_id}/progress */ public function progress(Request $request): Response { try { // 从请求路径中解析 task_id $path = $request->path(); if (preg_match('#/api/data-collection-tasks/([^/]+)/progress#', $path, $matches)) { $taskId = $matches[1]; } else { $taskId = $request->get('task_id'); if (!$taskId) { throw new \InvalidArgumentException('缺少 task_id 参数'); } } $service = $this->getService(); $task = $service->getTask($taskId); if ($task === null) { return ApiResponseHelper::error('任务不存在', 404); } $progress = $task['progress'] ?? []; return ApiResponseHelper::success($progress, '查询成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取数据源列表 * * GET /api/data-collection-tasks/data-sources */ public function getDataSources(Request $request): Response { try { // 优先使用数据库中的数据源,如果没有则使用配置文件 $service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository()); $result = $service->getDataSourceList(['status' => 1]); if (!empty($result['list'])) { // 使用数据库中的数据源 $list = array_map(function ($ds) { return [ 'id' => $ds['data_source_id'], 'name' => $ds['name'] ?? $ds['data_source_id'], // 添加名称字段 'type' => $ds['type'] ?? 'unknown', 'host' => $ds['host'] ?? '', 'port' => $ds['port'] ?? 0, 'database' => $ds['database'] ?? '', ]; }, $result['list']); } // 注意:现在数据源配置统一从数据库读取,不再使用config('data_sources') // 如果数据库中没有数据源,返回空列表 if (!isset($list)) { $list = []; } return ApiResponseHelper::success($list, '查询成功'); } catch (\MongoDB\Driver\Exception\Exception $e) { // MongoDB 连接错误,返回友好提示 $errorMessage = '无法连接到 MongoDB 数据库,请检查数据库服务是否正常运行。错误详情:' . $e->getMessage(); return ApiResponseHelper::error($errorMessage, 500); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取数据源的数据库列表 * * GET /api/data-collection-tasks/data-sources/{data_source_id}/databases */ public function getDatabases(Request $request, string $data_source_id): Response { try { // 从数据库获取数据源配置 $service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository()); $dataSourceConfig = $service->getDataSourceConfig($data_source_id); if (!$dataSourceConfig) { return ApiResponseHelper::error('数据源不存在', 404); } $dataSource = $dataSourceConfig; // 如果是MongoDB,连接并获取数据库列表 if ($dataSource['type'] === 'mongodb') { $client = $this->getMongoClient($dataSource); $databases = $client->listDatabases(); $list = []; foreach ($databases as $database) { $dbName = $database->getName(); // 同时返回原始名称和base64编码的ID(URL友好) $list[] = [ 'name' => $dbName, 'id' => base64_encode($dbName), // URL友好的标识符 ]; } return ApiResponseHelper::success($list, '查询成功'); } return ApiResponseHelper::error('不支持的数据源类型', 400); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取数据库的集合列表 * * GET /api/data-collection-tasks/data-sources/{data_source_id}/databases/{database}/collections */ public function getCollections(Request $request, string $data_source_id, string $database): Response { try { // 解码数据库名称(支持base64编码和URL编码) $database = $this->decodeName($database); // 从数据库获取数据源配置 $service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository()); $dataSourceConfig = $service->getDataSourceConfig($data_source_id); if (!$dataSourceConfig) { return ApiResponseHelper::error('数据源不存在', 404); } $dataSource = $dataSourceConfig; // 如果是MongoDB,连接并获取集合列表 if ($dataSource['type'] === 'mongodb') { $client = $this->getMongoClient($dataSource); $db = $client->selectDatabase($database); $collections = $db->listCollections(); $list = []; foreach ($collections as $collection) { $collName = $collection->getName(); // 同时返回原始名称和base64编码的ID(URL友好) $list[] = [ 'name' => $collName, 'id' => base64_encode($collName), // URL友好的标识符 ]; } return ApiResponseHelper::success($list, '查询成功'); } return ApiResponseHelper::error('不支持的数据源类型', 400); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取Handler的目标字段列表 * * GET /api/data-collection-tasks/handlers/{handler_type}/target-fields */ public function getHandlerTargetFields(Request $request, string $handler_type): Response { try { $fields = []; switch ($handler_type) { case 'consumption_record': // 消费记录Handler的目标字段列表 // 包含原始输入字段(推荐)和转换后字段(可选) // Handler会自动进行转换:phone_number/id_card -> user_id, store_name -> store_id $fields = [ // 用户标识字段(原始输入,推荐使用) ['name' => 'phone_number', 'label' => '手机号', 'type' => 'string', 'required' => false, 'description' => '手机号,Handler会自动解析为user_id', 'is_original' => true], ['name' => 'id_card', 'label' => '身份证', 'type' => 'string', 'required' => false, 'description' => '身份证号,Handler会自动解析为user_id', 'is_original' => true], // 用户ID(转换后字段,由Handler自动生成,不需要映射) ['name' => 'user_id', 'label' => '用户ID', 'type' => 'string', 'required' => false, 'description' => '用户ID,由Handler通过phone_number/id_card自动解析生成,无需映射', 'is_original' => false, 'no_mapping' => true], // 门店标识字段(原始输入,推荐使用) ['name' => 'store_name', 'label' => '门店名称', 'type' => 'string', 'required' => false, 'description' => '门店名称,Handler会自动转换为store_id', 'is_original' => true], // 门店ID(转换后字段,由Handler自动生成,不需要映射) ['name' => 'store_id', 'label' => '门店ID', 'type' => 'string', 'required' => false, 'description' => '门店ID,由Handler通过store_name自动转换生成,无需映射', 'is_original' => false, 'no_mapping' => true], // 订单标识字段(用于去重) ['name' => 'source_order_id', 'label' => '原始订单ID', 'type' => 'string', 'required' => false, 'description' => '原始订单ID,配合店铺名称做去重唯一标识(建议配置)', 'is_original' => true], // 注意:order_no 由系统自动生成(自动递增),不需要映射 // 金额和时间字段(直接字段) ['name' => 'amount', 'label' => '消费金额', 'type' => 'float', 'required' => true, 'description' => '消费金额(必填)', 'is_original' => true], ['name' => 'actual_amount', 'label' => '实际金额', 'type' => 'float', 'required' => true, 'description' => '实际支付金额(必填)', 'is_original' => true], ['name' => 'consume_time', 'label' => '消费时间', 'type' => 'datetime', 'required' => true, 'description' => '消费时间,用于时间分片存储(必填)', 'is_original' => true], // 其他可选字段 ['name' => 'currency', 'label' => '币种', 'type' => 'string', 'required' => false, 'description' => '币种,默认CNY(人民币)', 'is_original' => true, 'fixed_options' => true, 'options' => [['value' => 'CNY', 'label' => '人民币(CNY)'], ['value' => 'USD', 'label' => '美元(USD)']], 'default_value' => 'CNY'], ['name' => 'status', 'label' => '记录状态', 'type' => 'int', 'required' => false, 'description' => '记录状态:0-正常,1-异常,2-已删除。默认0。需要配置源状态值到标准状态值的映射', 'is_original' => true, 'value_mapping' => true, 'target_values' => [['value' => 0, 'label' => '正常(0)'], ['value' => 1, 'label' => '异常(1)'], ['value' => 2, 'label' => '已删除(2)']], 'default_value' => 0], ]; break; case 'generic': // 通用Handler - 没有固定的字段列表,由用户自定义 $fields = []; break; default: return ApiResponseHelper::error("未知的Handler类型: {$handler_type}", 400); } return ApiResponseHelper::success($fields, '查询成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 获取集合的字段列表(采样) * * GET /api/data-collection-tasks/data-sources/{data_source_id}/databases/{database}/collections/{collection}/fields */ public function getFields(Request $request, string $data_source_id, string $database, string $collection): Response { try { // 解码数据库名称和集合名称(支持base64编码和URL编码) $database = $this->decodeName($database); $collection = $this->decodeName($collection); // 从数据库获取数据源配置 $service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository()); $dataSourceConfig = $service->getDataSourceConfig($data_source_id); if (!$dataSourceConfig) { return ApiResponseHelper::error('数据源不存在', 404); } $dataSource = $dataSourceConfig; // 如果是MongoDB,采样获取字段 if ($dataSource['type'] === 'mongodb') { $client = $this->getMongoClient($dataSource); $db = $client->selectDatabase($database); $coll = $db->selectCollection($collection); // 采样一条数据 $sample = $coll->findOne([]); if ($sample) { $fields = []; $this->extractFields($sample, '', $fields); return ApiResponseHelper::success($fields, '查询成功'); } else { return ApiResponseHelper::success([], '集合为空,无法获取字段'); } } return ApiResponseHelper::error('不支持的数据源类型', 400); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 递归提取字段 */ private function extractFields($data, string $prefix, array &$fields): void { if (is_array($data) || is_object($data)) { foreach ($data as $key => $value) { $fieldName = $prefix ? "{$prefix}.{$key}" : $key; if (is_array($value) || is_object($value)) { if (empty($value)) { $fields[] = [ 'name' => $fieldName, 'type' => 'array', ]; } else { $this->extractFields($value, $fieldName, $fields); } } else { $fields[] = [ 'name' => $fieldName, 'type' => gettype($value), ]; } } } } /** * 预览查询结果(包含lookup) * * POST /api/data-collection-tasks/preview-query */ public function previewQuery(Request $request): Response { try { $data = $request->post(); $dataSourceId = $data['data_source_id'] ?? ''; $database = $data['database'] ?? ''; $collection = $data['collection'] ?? ''; $lookups = $data['lookups'] ?? []; $filterConditions = $data['filter_conditions'] ?? []; $limit = (int)($data['limit'] ?? 5); // 默认预览5条 if (empty($dataSourceId) || empty($database) || empty($collection)) { return ApiResponseHelper::error('缺少必要参数:data_source_id, database, collection', 400); } // 获取数据源配置 $service = new \app\service\DataSourceService(new \app\repository\DataSourceRepository()); $dataSourceConfig = $service->getDataSourceConfig($dataSourceId); if (!$dataSourceConfig) { return ApiResponseHelper::error('数据源不存在', 404); } if ($dataSourceConfig['type'] !== 'mongodb') { return ApiResponseHelper::error('目前只支持MongoDB数据源预览', 400); } // 连接MongoDB $client = $this->getMongoClient($dataSourceConfig); $db = $client->selectDatabase($database); $coll = $db->selectCollection($collection); // 构建聚合管道 $pipeline = []; // 1. 添加过滤条件($match)- 必须在最前面 $filter = $this->buildFilterForPreview($filterConditions); if (!empty($filter)) { $pipeline[] = ['$match' => $filter]; } // 2. 添加lookup查询 foreach ($lookups as $lookup) { if (empty($lookup['from']) || empty($lookup['local_field']) || empty($lookup['foreign_field'])) { continue; } $lookupStage = [ '$lookup' => [ 'from' => $lookup['from'], 'localField' => $lookup['local_field'], 'foreignField' => $lookup['foreign_field'], 'as' => $lookup['as'] ?? 'joined' ] ]; $pipeline[] = $lookupStage; // 如果配置了解构 if (!empty($lookup['unwrap'])) { $pipeline[] = [ '$unwind' => [ 'path' => '$' . ($lookup['as'] ?? 'joined'), 'preserveNullAndEmptyArrays' => !empty($lookup['preserve_null']) ] ]; } } // 3. 限制返回数量 $pipeline[] = ['$limit' => $limit]; // 执行聚合查询 $cursor = $coll->aggregate($pipeline); $results = []; $fields = []; foreach ($cursor as $doc) { $docArray = $this->convertMongoDocumentToArray($doc); $results[] = $docArray; // 提取字段 $this->extractFields($docArray, '', $fields); } // 去重字段 $uniqueFields = []; $fieldMap = []; foreach ($fields as $field) { if (!isset($fieldMap[$field['name']])) { $fieldMap[$field['name']] = true; $uniqueFields[] = $field; } } return ApiResponseHelper::success([ 'fields' => $uniqueFields, 'data' => $results, 'count' => count($results) ], '预览成功'); } catch (\Throwable $e) { return ApiResponseHelper::exception($e); } } /** * 将MongoDB文档转换为数组 */ private function convertMongoDocumentToArray($document): array { if (is_array($document)) { return $document; } if (is_object($document)) { $array = []; foreach ($document as $key => $value) { if ($value instanceof \MongoDB\BSON\UTCDateTime) { $array[$key] = $value->toDateTime()->format('Y-m-d H:i:s'); } elseif (is_object($value) && method_exists($value, '__toString')) { $array[$key] = (string)$value; } elseif (is_array($value) || is_object($value)) { $array[$key] = $this->convertMongoDocumentToArray($value); } else { $array[$key] = $value; } } return $array; } return []; } /** * 构建过滤条件(用于预览查询) * * @param array $filterConditions 过滤条件列表 * @return array MongoDB查询过滤器 */ private function buildFilterForPreview(array $filterConditions): array { $filter = []; foreach ($filterConditions as $condition) { $field = $condition['field'] ?? ''; $operator = $condition['operator'] ?? 'eq'; $value = $condition['value'] ?? null; if (empty($field)) { continue; } // 处理值的类型转换 if ($value !== null && $value !== '') { // 尝试转换为数字(如果是数字字符串) if (is_numeric($value)) { // 判断是整数还是浮点数 if (strpos($value, '.') !== false) { $value = (float)$value; } else { $value = (int)$value; } } } switch ($operator) { case 'eq': $filter[$field] = $value; break; case 'ne': $filter[$field] = ['$ne' => $value]; break; case 'gt': $filter[$field] = ['$gt' => $value]; break; case 'gte': $filter[$field] = ['$gte' => $value]; break; case 'lt': $filter[$field] = ['$lt' => $value]; break; case 'lte': $filter[$field] = ['$lte' => $value]; break; case 'in': // in操作符的值应该是数组 $valueArray = is_array($value) ? $value : explode(',', (string)$value); $filter[$field] = ['$in' => $valueArray]; break; case 'nin': // nin操作符的值应该是数组 $valueArray = is_array($value) ? $value : explode(',', (string)$value); $filter[$field] = ['$nin' => $valueArray]; break; } } return $filter; } /** * 解码数据库或集合名称(支持base64编码和URL编码) * * @param string $name 编码后的名称 * @return string 解码后的名称 */ private function decodeName(string $name): string { // 尝试base64解码(如果前端使用的是编码后的ID) // 检查是否可能是base64编码(只包含base64字符且长度合理) if (preg_match('/^[A-Za-z0-9+\/]*={0,2}$/', $name) && strlen($name) > 0) { $decoded = @base64_decode($name, true); if ($decoded !== false && $decoded !== '') { // 解码成功,使用解码后的值 return $decoded; } } // 不是base64格式或解码失败,使用URL解码(处理中文等特殊字符) return rawurldecode($name); } /** * 获取MongoDB客户端 */ private function getMongoClient(array $config): \MongoDB\Client { $host = $config['host'] ?? ''; $port = (int)($config['port'] ?? 27017); $username = $config['username'] ?? ''; $password = $config['password'] ?? ''; $authSource = $config['auth_source'] ?? 'admin'; if (!empty($username) && !empty($password)) { $dsn = "mongodb://{$username}:{$password}@{$host}:{$port}/{$authSource}"; } else { $dsn = "mongodb://{$host}:{$port}"; } return new \MongoDB\Client($dsn, $config['options'] ?? []); } }