$data * @return DataSourceRepository * @throws \Exception */ public function createDataSource(array $data): DataSourceRepository { // 生成ID if (empty($data['data_source_id'])) { $data['data_source_id'] = UuidGenerator::uuid4()->toString(); } // 验证必填字段 $requiredFields = ['name', 'type', 'host', 'port', 'database']; foreach ($requiredFields as $field) { if (empty($data[$field])) { throw new \InvalidArgumentException("缺少必填字段: {$field}"); } } // 验证类型 $allowedTypes = ['mongodb', 'mysql', 'postgresql']; if (!in_array(strtolower($data['type']), $allowedTypes)) { throw new \InvalidArgumentException("不支持的数据源类型: {$data['type']}"); } // 验证ID唯一性 $existing = $this->repository->newQuery() ->where('data_source_id', $data['data_source_id']) ->first(); if ($existing) { throw new \InvalidArgumentException("数据源ID已存在: {$data['data_source_id']}"); } // 验证名称唯一性 $existingByName = $this->repository->newQuery() ->where('name', $data['name']) ->first(); if ($existingByName) { throw new \InvalidArgumentException("数据源名称已存在: {$data['name']}"); } // 设置默认值 $data['status'] = $data['status'] ?? 1; // 1:启用, 0:禁用 $data['options'] = $data['options'] ?? []; $data['is_tag_engine'] = $data['is_tag_engine'] ?? false; // 默认不是标签引擎数据库 // 创建数据源 $dataSource = new DataSourceRepository($data); $dataSource->save(); // 如果设置为标签引擎数据库,自动将其他数据源设置为 false(确保只有一个) if (!empty($data['is_tag_engine'])) { // 将所有其他数据源的 is_tag_engine 设置为 false $this->repository->newQuery() ->where('data_source_id', '!=', $dataSource->data_source_id) ->update(['is_tag_engine' => false]); LoggerHelper::logBusiness('tag_engine_set', [ 'data_source_id' => $dataSource->data_source_id, 'action' => 'create', ]); } LoggerHelper::logBusiness('data_source_created', [ 'data_source_id' => $dataSource->data_source_id, 'name' => $dataSource->name, 'type' => $dataSource->type, ]); return $dataSource; } /** * 更新数据源 * * @param string $dataSourceId * @param array $data * @return bool */ public function updateDataSource(string $dataSourceId, array $data): bool { $dataSource = $this->repository->find($dataSourceId); if (!$dataSource) { throw new \InvalidArgumentException("数据源不存在: {$dataSourceId}"); } // 如果更新名称,验证唯一性 if (isset($data['name']) && $data['name'] !== $dataSource->name) { $existing = $this->repository->newQuery() ->where('name', $data['name']) ->where('data_source_id', '!=', $dataSourceId) ->first(); if ($existing) { throw new \InvalidArgumentException("数据源名称已存在: {$data['name']}"); } } // 如果设置为标签引擎数据库,自动将其他数据源设置为 false(确保只有一个) if (isset($data['is_tag_engine']) && !empty($data['is_tag_engine'])) { // 将所有其他数据源的 is_tag_engine 设置为 false $this->repository->newQuery() ->where('data_source_id', '!=', $dataSourceId) ->update(['is_tag_engine' => false]); LoggerHelper::logBusiness('tag_engine_set', [ 'data_source_id' => $dataSourceId, 'action' => 'update', ]); } // 更新数据 $dataSource->fill($data); $result = $dataSource->save(); if ($result) { LoggerHelper::logBusiness('data_source_updated', [ 'data_source_id' => $dataSourceId, ]); } return $result; } /** * 删除数据源 * * @param string $dataSourceId * @return bool */ public function deleteDataSource(string $dataSourceId): bool { $dataSource = $this->repository->find($dataSourceId); if (!$dataSource) { throw new \InvalidArgumentException("数据源不存在: {$dataSourceId}"); } // TODO: 检查是否有任务在使用此数据源 // 可以查询 DataCollectionTask 中是否有引用此数据源 $result = $dataSource->delete(); if ($result) { LoggerHelper::logBusiness('data_source_deleted', [ 'data_source_id' => $dataSourceId, ]); } return $result; } /** * 获取数据源列表 * * @param array $filters * @return array{list: array, total: int} */ public function getDataSourceList(array $filters = []): array { try { $query = $this->repository->newQuery(); // 筛选条件 if (isset($filters['type'])) { $query->where('type', $filters['type']); } if (isset($filters['status'])) { $query->where('status', $filters['status']); } if (isset($filters['name'])) { $query->where('name', 'like', '%' . $filters['name'] . '%'); } // 排序 $query->orderBy('created_at', 'desc'); // 分页 $page = (int)($filters['page'] ?? 1); $pageSize = (int)($filters['page_size'] ?? 20); $total = $query->count(); $list = $query->skip(($page - 1) * $pageSize) ->take($pageSize) ->get() ->map(function ($item) { // 不返回密码 $data = $item->toArray(); unset($data['password']); return $data; }) ->toArray(); return [ 'list' => $list, 'total' => $total, ]; } catch (\MongoDB\Driver\Exception\Exception $e) { // MongoDB 连接错误 LoggerHelper::logError($e, [ 'component' => 'DataSourceService', 'action' => 'getDataSourceList', ]); throw new \RuntimeException('无法连接到 MongoDB 数据库,请检查数据库服务是否正常运行', 500, $e); } catch (\Exception $e) { LoggerHelper::logError($e, [ 'component' => 'DataSourceService', 'action' => 'getDataSourceList', ]); throw $e; } } /** * 获取数据源详情(不包含密码) * * @param string $dataSourceId * @return array|null */ public function getDataSourceDetail(string $dataSourceId): ?array { $dataSource = $this->repository->find($dataSourceId); if (!$dataSource) { return null; } $data = $dataSource->toArray(); unset($data['password']); return $data; } /** * 获取数据源详情(包含密码,用于连接) * * @param string $dataSourceId * @return array|null */ public function getDataSourceConfig(string $dataSourceId): ?array { $dataSource = $this->repository->find($dataSourceId); if (!$dataSource) { return null; } if ($dataSource->status != 1) { throw new \RuntimeException("数据源已禁用: {$dataSourceId}"); } return $dataSource->toConfigArray(); } /** * 测试数据源连接 * * @param array $config * @return bool */ public function testConnection(array $config): bool { try { $type = strtolower($config['type'] ?? ''); // MongoDB特殊处理 if ($type === 'mongodb') { // 使用 MongoDBHelper 创建客户端(统一DSN构建逻辑) $client = \app\utils\MongoDBHelper::createClient($config, [ 'connectTimeoutMS' => 3000, 'socketTimeoutMS' => 5000, ]); // 尝试列出数据库来测试连接 $client->listDatabases(); return true; } // 其他类型使用适配器 $adapter = DataSourceAdapterFactory::create($type, $config); $connected = $adapter->isConnected(); $adapter->disconnect(); return $connected; } catch (\Throwable $e) { LoggerHelper::logError($e, [ 'component' => 'DataSourceService', 'action' => 'testConnection', ]); return false; } } /** * 获取所有启用的数据源(用于替代config('data_sources'),从数据库读取) * * @return array 以data_source_id为key的配置数组 */ public function getAllEnabledDataSources(): array { $dataSources = $this->repository->newQuery() ->where('status', 1) ->get(); $result = []; foreach ($dataSources as $ds) { $result[$ds->data_source_id] = $ds->toConfigArray(); } return $result; } /** * 根据数据源ID获取配置(从数据库读取) * * 支持两种查询方式: * 1. 通过 data_source_id (UUID) 查询 * 2. 通过 name 字段查询(兼容配置文件中的 key,如 sync_mongodb, tag_mongodb) * * @param string $dataSourceId 数据源ID或名称 * @return array|null 数据源配置,不存在或禁用时返回null */ public function getDataSourceConfigById(string $dataSourceId): ?array { // \Workerman\Worker::safeEcho("[DataSourceService] 查询数据源配置: data_source_id={$dataSourceId}\n"); // 先尝试通过 data_source_id 查询(UUID 格式) $dataSource = $this->repository->newQuery() ->where('data_source_id', $dataSourceId) ->where('status', 1) ->first(); if ($dataSource) { // \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 data_source_id 查询成功: name={$dataSource->name}\n"); return $dataSource->toConfigArray(); } // 如果通过 data_source_id 查不到,尝试通过 name 字段查询(兼容配置文件中的 key) // \Workerman\Worker::safeEcho("[DataSourceService] 通过 data_source_id 未找到,尝试通过 name 查询\n"); // 处理配置文件中的常见 key 映射 // 注意:这些映射需要根据实际数据库中的 name 字段值来调整 $nameMapping = [ 'sync_mongodb' => '本地大数据库', // 根据实际数据库中的名称调整 'tag_mongodb' => '主数据库', // 标签引擎数据库(is_tag_engine=true) 'kr_mongodb' => '卡若的主机', // 卡若数据库 ]; $searchName = $nameMapping[$dataSourceId] ?? null; if ($searchName) { // \Workerman\Worker::safeEcho("[DataSourceService] 使用映射名称查询: {$dataSourceId} -> {$searchName}\n"); // 使用映射的名称查询 $dataSource = $this->repository->newQuery() ->where('name', $searchName) ->where('status', 1) ->first(); if ($dataSource) { // \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过映射名称查询成功: name={$dataSource->name}, data_source_id={$dataSource->data_source_id}\n"); return $dataSource->toConfigArray(); } } // 如果还是查不到,尝试直接使用 dataSourceId 作为 name 查询 // \Workerman\Worker::safeEcho("[DataSourceService] 尝试直接使用 dataSourceId 作为 name 查询: {$dataSourceId}\n"); $dataSource = $this->repository->newQuery() ->where('name', $dataSourceId) ->where('status', 1) ->first(); if ($dataSource) { // \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 name 直接查询成功: name={$dataSource->name}\n"); return $dataSource->toConfigArray(); } // 如果还是查不到,对于 tag_mongodb,尝试查询 is_tag_engine=true 的数据源 if ($dataSourceId === 'tag_mongodb') { // \Workerman\Worker::safeEcho("[DataSourceService] 对于 tag_mongodb,尝试查询 is_tag_engine=true 的数据源\n"); $dataSource = $this->repository->newQuery() ->where('is_tag_engine', true) ->where('status', 1) ->first(); if ($dataSource) { // \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 is_tag_engine 查询成功: name={$dataSource->name}, data_source_id={$dataSource->data_source_id}\n"); return $dataSource->toConfigArray(); } } // \Workerman\Worker::safeEcho("[DataSourceService] ✗ 未找到数据源配置: data_source_id={$dataSourceId}\n"); return null; } /** * 获取标签引擎数据库配置(is_tag_engine = true的数据源) * * @return array|null 标签引擎数据库配置,未找到时返回null */ public function getTagEngineDataSourceConfig(): ?array { $dataSource = $this->repository->newQuery() ->where('is_tag_engine', true) ->where('status', 1) ->first(); if (!$dataSource) { return null; } return $dataSource->toConfigArray(); } /** * 获取标签引擎数据库的data_source_id * * @return string|null 标签引擎数据库的data_source_id,未找到时返回null */ public function getTagEngineDataSourceId(): ?string { $dataSource = $this->repository->newQuery() ->where('is_tag_engine', true) ->where('status', 1) ->first(); return $dataSource ? $dataSource->data_source_id : null; } /** * 验证标签引擎数据库配置是否存在 * * @return bool 是否存在标签引擎数据库 */ public function hasTagEngineDataSource(): bool { $count = $this->repository->newQuery() ->where('is_tag_engine', true) ->where('status', 1) ->count(); return $count > 0; } /** * 获取所有标签引擎数据库(理论上应该只有一个,但允许有多个) * * @return array 标签引擎数据库列表 */ public function getAllTagEngineDataSources(): array { $dataSources = $this->repository->newQuery() ->where('is_tag_engine', true) ->where('status', 1) ->get(); $result = []; foreach ($dataSources as $ds) { $data = $ds->toArray(); unset($data['password']); // 不返回密码 $result[] = $data; } return $result; } }