Files
cunkebao_v3/Moncter/app/service/DataSourceService.php

499 lines
16 KiB
PHP
Raw Permalink Normal View History

2026-01-05 10:16:20 +08:00
<?php
namespace app\service;
use app\repository\DataSourceRepository;
use app\service\DataSource\DataSourceAdapterFactory;
use app\utils\LoggerHelper;
use MongoDB\Client;
use Ramsey\Uuid\Uuid as UuidGenerator;
/**
* 数据源服务
*
* 职责:
* - 管理数据源的CRUD操作
* - 验证数据源连接
* - 提供数据源配置
*/
class DataSourceService
{
public function __construct(
protected DataSourceRepository $repository
) {
}
/**
* 创建数据源
*
* @param array<string, mixed> $data
* @return DataSourceRepository
* @throws \Exception
*/
public function createDataSource(array $data): DataSourceRepository
{
// 生成ID
if (empty($data['data_source_id'])) {
$data['data_source_id'] = UuidGenerator::uuid4()->toString();
}
// 验证必填字段
$requiredFields = ['name', 'type', 'host', 'port', 'database'];
foreach ($requiredFields as $field) {
if (empty($data[$field])) {
throw new \InvalidArgumentException("缺少必填字段: {$field}");
}
}
// 验证类型
$allowedTypes = ['mongodb', 'mysql', 'postgresql'];
if (!in_array(strtolower($data['type']), $allowedTypes)) {
throw new \InvalidArgumentException("不支持的数据源类型: {$data['type']}");
}
// 验证ID唯一性
$existing = $this->repository->newQuery()
->where('data_source_id', $data['data_source_id'])
->first();
if ($existing) {
throw new \InvalidArgumentException("数据源ID已存在: {$data['data_source_id']}");
}
// 验证名称唯一性
$existingByName = $this->repository->newQuery()
->where('name', $data['name'])
->first();
if ($existingByName) {
throw new \InvalidArgumentException("数据源名称已存在: {$data['name']}");
}
// 设置默认值
$data['status'] = $data['status'] ?? 1; // 1:启用, 0:禁用
$data['options'] = $data['options'] ?? [];
$data['is_tag_engine'] = $data['is_tag_engine'] ?? false; // 默认不是标签引擎数据库
// 创建数据源
$dataSource = new DataSourceRepository($data);
$dataSource->save();
// 如果设置为标签引擎数据库,自动将其他数据源设置为 false确保只有一个
if (!empty($data['is_tag_engine'])) {
// 将所有其他数据源的 is_tag_engine 设置为 false
$this->repository->newQuery()
->where('data_source_id', '!=', $dataSource->data_source_id)
->update(['is_tag_engine' => false]);
LoggerHelper::logBusiness('tag_engine_set', [
'data_source_id' => $dataSource->data_source_id,
'action' => 'create',
]);
}
LoggerHelper::logBusiness('data_source_created', [
'data_source_id' => $dataSource->data_source_id,
'name' => $dataSource->name,
'type' => $dataSource->type,
]);
return $dataSource;
}
/**
* 更新数据源
*
* @param string $dataSourceId
* @param array<string, mixed> $data
* @return bool
*/
public function updateDataSource(string $dataSourceId, array $data): bool
{
$dataSource = $this->repository->find($dataSourceId);
if (!$dataSource) {
throw new \InvalidArgumentException("数据源不存在: {$dataSourceId}");
}
// 如果更新名称,验证唯一性
if (isset($data['name']) && $data['name'] !== $dataSource->name) {
$existing = $this->repository->newQuery()
->where('name', $data['name'])
->where('data_source_id', '!=', $dataSourceId)
->first();
if ($existing) {
throw new \InvalidArgumentException("数据源名称已存在: {$data['name']}");
}
}
// 如果设置为标签引擎数据库,自动将其他数据源设置为 false确保只有一个
if (isset($data['is_tag_engine']) && !empty($data['is_tag_engine'])) {
// 将所有其他数据源的 is_tag_engine 设置为 false
$this->repository->newQuery()
->where('data_source_id', '!=', $dataSourceId)
->update(['is_tag_engine' => false]);
LoggerHelper::logBusiness('tag_engine_set', [
'data_source_id' => $dataSourceId,
'action' => 'update',
]);
}
// 更新数据
$dataSource->fill($data);
$result = $dataSource->save();
if ($result) {
LoggerHelper::logBusiness('data_source_updated', [
'data_source_id' => $dataSourceId,
]);
}
return $result;
}
/**
* 删除数据源
*
* @param string $dataSourceId
* @return bool
*/
public function deleteDataSource(string $dataSourceId): bool
{
$dataSource = $this->repository->find($dataSourceId);
if (!$dataSource) {
throw new \InvalidArgumentException("数据源不存在: {$dataSourceId}");
}
// TODO: 检查是否有任务在使用此数据源
// 可以查询 DataCollectionTask 中是否有引用此数据源
$result = $dataSource->delete();
if ($result) {
LoggerHelper::logBusiness('data_source_deleted', [
'data_source_id' => $dataSourceId,
]);
}
return $result;
}
/**
* 获取数据源列表
*
* @param array<string, mixed> $filters
* @return array{list: array, total: int}
*/
public function getDataSourceList(array $filters = []): array
{
try {
$query = $this->repository->newQuery();
// 筛选条件
if (isset($filters['type'])) {
$query->where('type', $filters['type']);
}
if (isset($filters['status'])) {
$query->where('status', $filters['status']);
}
if (isset($filters['name'])) {
$query->where('name', 'like', '%' . $filters['name'] . '%');
}
// 排序
$query->orderBy('created_at', 'desc');
// 分页
$page = (int)($filters['page'] ?? 1);
$pageSize = (int)($filters['page_size'] ?? 20);
$total = $query->count();
$list = $query->skip(($page - 1) * $pageSize)
->take($pageSize)
->get()
->map(function ($item) {
// 不返回密码
$data = $item->toArray();
unset($data['password']);
return $data;
})
->toArray();
return [
'list' => $list,
'total' => $total,
];
} catch (\MongoDB\Driver\Exception\Exception $e) {
// MongoDB 连接错误
LoggerHelper::logError($e, [
'component' => 'DataSourceService',
'action' => 'getDataSourceList',
]);
throw new \RuntimeException('无法连接到 MongoDB 数据库,请检查数据库服务是否正常运行', 500, $e);
} catch (\Exception $e) {
LoggerHelper::logError($e, [
'component' => 'DataSourceService',
'action' => 'getDataSourceList',
]);
throw $e;
}
}
/**
* 获取数据源详情(不包含密码)
*
* @param string $dataSourceId
* @return array<string, mixed>|null
*/
public function getDataSourceDetail(string $dataSourceId): ?array
{
$dataSource = $this->repository->find($dataSourceId);
if (!$dataSource) {
return null;
}
$data = $dataSource->toArray();
unset($data['password']);
return $data;
}
/**
* 获取数据源详情(包含密码,用于连接)
*
* @param string $dataSourceId
* @return array<string, mixed>|null
*/
public function getDataSourceConfig(string $dataSourceId): ?array
{
$dataSource = $this->repository->find($dataSourceId);
if (!$dataSource) {
return null;
}
if ($dataSource->status != 1) {
throw new \RuntimeException("数据源已禁用: {$dataSourceId}");
}
return $dataSource->toConfigArray();
}
/**
* 测试数据源连接
*
* @param array<string, mixed> $config
* @return bool
*/
public function testConnection(array $config): bool
{
try {
$type = strtolower($config['type'] ?? '');
// MongoDB特殊处理
if ($type === 'mongodb') {
// 使用 MongoDBHelper 创建客户端统一DSN构建逻辑
$client = \app\utils\MongoDBHelper::createClient($config, [
'connectTimeoutMS' => 3000,
'socketTimeoutMS' => 5000,
]);
// 尝试列出数据库来测试连接
$client->listDatabases();
return true;
}
// 其他类型使用适配器
$adapter = DataSourceAdapterFactory::create($type, $config);
$connected = $adapter->isConnected();
$adapter->disconnect();
return $connected;
} catch (\Throwable $e) {
LoggerHelper::logError($e, [
'component' => 'DataSourceService',
'action' => 'testConnection',
]);
return false;
}
}
/**
* 获取所有启用的数据源用于替代config('data_sources'),从数据库读取)
*
* @return array<string, array> 以data_source_id为key的配置数组
*/
public function getAllEnabledDataSources(): array
{
$dataSources = $this->repository->newQuery()
->where('status', 1)
->get();
$result = [];
foreach ($dataSources as $ds) {
$result[$ds->data_source_id] = $ds->toConfigArray();
}
return $result;
}
/**
* 根据数据源ID获取配置从数据库读取
*
* 支持两种查询方式:
* 1. 通过 data_source_id (UUID) 查询
* 2. 通过 name 字段查询(兼容配置文件中的 key sync_mongodb, tag_mongodb
*
* @param string $dataSourceId 数据源ID或名称
* @return array<string, mixed>|null 数据源配置不存在或禁用时返回null
*/
public function getDataSourceConfigById(string $dataSourceId): ?array
{
// \Workerman\Worker::safeEcho("[DataSourceService] 查询数据源配置: data_source_id={$dataSourceId}\n");
// 先尝试通过 data_source_id 查询UUID 格式)
$dataSource = $this->repository->newQuery()
->where('data_source_id', $dataSourceId)
->where('status', 1)
->first();
if ($dataSource) {
// \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 data_source_id 查询成功: name={$dataSource->name}\n");
return $dataSource->toConfigArray();
}
// 如果通过 data_source_id 查不到,尝试通过 name 字段查询(兼容配置文件中的 key
// \Workerman\Worker::safeEcho("[DataSourceService] 通过 data_source_id 未找到,尝试通过 name 查询\n");
// 处理配置文件中的常见 key 映射
// 注意:这些映射需要根据实际数据库中的 name 字段值来调整
$nameMapping = [
'sync_mongodb' => '本地大数据库', // 根据实际数据库中的名称调整
'tag_mongodb' => '主数据库', // 标签引擎数据库is_tag_engine=true
'kr_mongodb' => '卡若的主机', // 卡若数据库
];
$searchName = $nameMapping[$dataSourceId] ?? null;
if ($searchName) {
// \Workerman\Worker::safeEcho("[DataSourceService] 使用映射名称查询: {$dataSourceId} -> {$searchName}\n");
// 使用映射的名称查询
$dataSource = $this->repository->newQuery()
->where('name', $searchName)
->where('status', 1)
->first();
if ($dataSource) {
// \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过映射名称查询成功: name={$dataSource->name}, data_source_id={$dataSource->data_source_id}\n");
return $dataSource->toConfigArray();
}
}
// 如果还是查不到,尝试直接使用 dataSourceId 作为 name 查询
// \Workerman\Worker::safeEcho("[DataSourceService] 尝试直接使用 dataSourceId 作为 name 查询: {$dataSourceId}\n");
$dataSource = $this->repository->newQuery()
->where('name', $dataSourceId)
->where('status', 1)
->first();
if ($dataSource) {
// \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 name 直接查询成功: name={$dataSource->name}\n");
return $dataSource->toConfigArray();
}
// 如果还是查不到,对于 tag_mongodb尝试查询 is_tag_engine=true 的数据源
if ($dataSourceId === 'tag_mongodb') {
// \Workerman\Worker::safeEcho("[DataSourceService] 对于 tag_mongodb尝试查询 is_tag_engine=true 的数据源\n");
$dataSource = $this->repository->newQuery()
->where('is_tag_engine', true)
->where('status', 1)
->first();
if ($dataSource) {
// \Workerman\Worker::safeEcho("[DataSourceService] ✓ 通过 is_tag_engine 查询成功: name={$dataSource->name}, data_source_id={$dataSource->data_source_id}\n");
return $dataSource->toConfigArray();
}
}
// \Workerman\Worker::safeEcho("[DataSourceService] ✗ 未找到数据源配置: data_source_id={$dataSourceId}\n");
return null;
}
/**
* 获取标签引擎数据库配置is_tag_engine = true的数据源
*
* @return array<string, mixed>|null 标签引擎数据库配置未找到时返回null
*/
public function getTagEngineDataSourceConfig(): ?array
{
$dataSource = $this->repository->newQuery()
->where('is_tag_engine', true)
->where('status', 1)
->first();
if (!$dataSource) {
return null;
}
return $dataSource->toConfigArray();
}
/**
* 获取标签引擎数据库的data_source_id
*
* @return string|null 标签引擎数据库的data_source_id未找到时返回null
*/
public function getTagEngineDataSourceId(): ?string
{
$dataSource = $this->repository->newQuery()
->where('is_tag_engine', true)
->where('status', 1)
->first();
return $dataSource ? $dataSource->data_source_id : null;
}
/**
* 验证标签引擎数据库配置是否存在
*
* @return bool 是否存在标签引擎数据库
*/
public function hasTagEngineDataSource(): bool
{
$count = $this->repository->newQuery()
->where('is_tag_engine', true)
->where('status', 1)
->count();
return $count > 0;
}
/**
* 获取所有标签引擎数据库(理论上应该只有一个,但允许有多个)
*
* @return array 标签引擎数据库列表
*/
public function getAllTagEngineDataSources(): array
{
$dataSources = $this->repository->newQuery()
->where('is_tag_engine', true)
->where('status', 1)
->get();
$result = [];
foreach ($dataSources as $ds) {
$data = $ds->toArray();
unset($data['password']); // 不返回密码
$result[] = $data;
}
return $result;
}
}