Commit 0e2c0172 authored by uuo00_n's avatar uuo00_n

feat(security-service): 添加MongoDB支持以实现分析结果持久化和历史查询功能

添加MongoDB数据库连接配置和持久化逻辑
实现安全分析和攻击建议的历史记录存储
新增历史查询接口和文档说明
parent 77ca28f8
......@@ -100,6 +100,10 @@ services:
- JWT_SECRET=llm_filter_secure_secret_key_2025_update_must_be_32_bytes
- DIFY_API_URL=http://datacenter.dldzxx.cn:8089/v1
- DIFY_API_KEY=app-xTl0Ri6ir7cAuvngDtFe3hKP
- MONGODB_URL=mongodb://mongo:27017
- MONGODB_DB_NAME=security_service_db
depends_on:
- mongo
networks:
- llm-network
......
......@@ -173,21 +173,32 @@ Content-Type: application/json
当前版本的 Security Service:
- 不写入任何数据库(PostgreSQL / MongoDB)
- 不记录历史分析结果或报告
- 所有分析基于:
- 请求中传入的设备数据
- 实时从 Dify 获取的分析结果
- **数据持久化**:分析结果(风险分析与攻击建议)会异步存储到 **MongoDB** 数据库中。
- **历史查询**:提供接口查询历史分析记录。
后续如果需要:
## 7. 历史查询接口
- 可以将分析结果落库到 PostgreSQL 或 MongoDB
- 典型扩展:
- 安全日报历史查询
- 风险趋势分析
- 设备安全基线与偏离检测
### 7.1 安全分析历史
## 7. 快速调试示例(curl)
- 方法:`GET`
- URL:`/api/v1/security/analysis/history`
- 参数:
- `start_date` (可选): 开始时间 (ISO 8601)
- `end_date` (可选): 结束时间 (ISO 8601)
- `limit` (可选): 返回数量限制,默认 20
- 响应:包含 `total``items` (AnalysisHistoryItem)
### 7.2 攻击建议历史
- 方法:`GET`
- URL:`/api/v1/security/attack-advice/history`
- 参数:
- `start_date` (可选): 开始时间 (ISO 8601)
- `end_date` (可选): 结束时间 (ISO 8601)
- `limit` (可选): 返回数量限制,默认 20
- 响应:包含 `total``items` (AttackAdviceHistoryItem)
## 8. 快速调试示例(curl)
确保容器已启动后,可以在宿主机直接运行:
......
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Query
from app.schemas.payloads import *
from app.services.analysis import SecurityService
from app.services.rss import RSSService
from app.core.security import get_current_admin
from app.core.database import db
from datetime import datetime, timezone
router = APIRouter()
service = SecurityService()
......@@ -10,11 +12,51 @@ rss_service = RSSService()
@router.post("/analysis", response_model=SecurityAnalysisResponse)
async def analyze_risks(request: SecurityAnalysisRequest, admin: dict = Depends(get_current_admin)):
return await service.analyze_risks(request.devices)
result = await service.analyze_risks(request.devices)
# 异步存储结果到 MongoDB
if db.db is not None:
log_entry = result.model_dump()
log_entry["created_at"] = datetime.now(timezone.utc)
await db.db.security_analysis_logs.insert_one(log_entry)
return result
@router.get("/analysis/history", response_model=HistoryQueryResponse)
async def get_analysis_history(
start_date: Optional[datetime] = Query(None, description="开始时间 (ISO 8601)"),
end_date: Optional[datetime] = Query(None, description="结束时间 (ISO 8601)"),
limit: int = Query(20, ge=1, le=100, description="返回数量限制"),
admin: dict = Depends(get_current_admin)
):
"""
查询安全分析历史记录
"""
return await service.get_analysis_history(start_date, end_date, limit)
@router.post("/attack-advice", response_model=AttackAdviceResponse)
async def get_attack_advice(request: AttackAdviceRequest, admin: dict = Depends(get_current_admin)):
return await service.get_attack_advice(request.attack_type, request.target_device, request.logs)
result = await service.get_attack_advice(request.attack_type, request.target_device, request.logs)
# 异步存储结果到 MongoDB
if db.db is not None:
log_entry = result.model_dump()
log_entry["created_at"] = datetime.now(timezone.utc)
await db.db.attack_advice_logs.insert_one(log_entry)
return result
@router.get("/attack-advice/history", response_model=HistoryQueryResponse)
async def get_attack_advice_history(
start_date: Optional[datetime] = Query(None, description="开始时间 (ISO 8601)"),
end_date: Optional[datetime] = Query(None, description="结束时间 (ISO 8601)"),
limit: int = Query(20, ge=1, le=100, description="返回数量限制"),
admin: dict = Depends(get_current_admin)
):
"""
查询攻击建议历史记录
"""
return await service.get_attack_advice_history(start_date, end_date, limit)
@router.get("/report", response_model=SecurityReportResponse)
async def generate_report(admin: dict = Depends(get_current_admin)):
......
......@@ -12,6 +12,10 @@ class Settings(BaseSettings):
DIFY_API_URL: str = "http://datacenter.dldzxx.cn:8089/v1"
DIFY_API_KEY: str = "app-lkK33EQOVXXrjD9x3SKbItr7"
# MongoDB 配置
MONGODB_URL: str = "mongodb://localhost:27017"
MONGODB_DB_NAME: str = "security_service_db"
class Config:
case_sensitive = True
......
from motor.motor_asyncio import AsyncIOMotorClient
from app.core.config import settings
class Database:
client: AsyncIOMotorClient = None
db = None
def connect(self):
self.client = AsyncIOMotorClient(settings.MONGODB_URL)
self.db = self.client[settings.MONGODB_DB_NAME]
print(f"Connected to MongoDB at {settings.MONGODB_URL}")
def close(self):
if self.client:
self.client.close()
print("MongoDB connection closed")
db = Database()
async def get_database():
return db.db
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.api.v1.endpoints import router as security_router
from app.core.config import settings
from app.core.database import db
app = FastAPI(title=settings.PROJECT_NAME)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
db.connect()
yield
# Shutdown
db.close()
app = FastAPI(title=settings.PROJECT_NAME, lifespan=lifespan)
# 注册路由
app.include_router(security_router, prefix=f"{settings.API_V1_STR}/security", tags=["Security"])
......
from typing import List, Optional, Any, Union
from pydantic import BaseModel
from datetime import datetime
class DeviceInfo(BaseModel):
id: str
......@@ -51,3 +52,15 @@ class RSSItem(BaseModel):
class RSSFeedResponse(BaseModel):
items: List[RSSItem]
class AnalysisHistoryItem(SecurityAnalysisResponse):
id: str
created_at: datetime
class AttackAdviceHistoryItem(AttackAdviceResponse):
id: str
created_at: datetime
class HistoryQueryResponse(BaseModel):
total: int
items: List[Union[AnalysisHistoryItem, AttackAdviceHistoryItem]]
......@@ -5,6 +5,7 @@ from typing import List, Dict, Any, Optional
from pydantic import ValidationError
from app.schemas.payloads import *
from app.core.config import settings
from app.core.database import db
import logging
logger = logging.getLogger(__name__)
......@@ -66,17 +67,63 @@ class SecurityService:
async def monitor_risks(self) -> RiskMonitorResponse:
"""
任务:漏洞监测
[MOCK] 实时风险监控
"""
# 注意:此处应从外部漏洞库或配置获取关注列表
recent_vulns = []
return RiskMonitorResponse(
detected_vulnerabilities=["CVE-2024-0001 (Critical)", "Weak SSH Config"],
compliance_risks=["Password policy outdated", "Unencrypted backup found"],
ai_assessment="System security posture is stable but requires attention on recent CVEs."
)
async def get_analysis_history(self, start_date: datetime = None, end_date: datetime = None, limit: int = 20) -> HistoryQueryResponse:
"""
查询安全分析历史
"""
if db.db is None:
return HistoryQueryResponse(total=0, items=[])
inputs = {
"task_type": "monitor",
"context_data": json.dumps(recent_vulns, ensure_ascii=False)
}
query = {}
if start_date or end_date:
query["created_at"] = {}
if start_date:
query["created_at"]["$gte"] = start_date
if end_date:
query["created_at"]["$lte"] = end_date
total = await db.db.security_analysis_logs.count_documents(query)
cursor = db.db.security_analysis_logs.find(query).sort("created_at", -1).limit(limit)
items = []
async for doc in cursor:
doc["id"] = str(doc.pop("_id"))
items.append(AnalysisHistoryItem(**doc))
return HistoryQueryResponse(total=total, items=items)
async def get_attack_advice_history(self, start_date: datetime = None, end_date: datetime = None, limit: int = 20) -> HistoryQueryResponse:
"""
查询攻击建议历史
"""
if db.db is None:
return HistoryQueryResponse(total=0, items=[])
return await self._call_llm(inputs, RiskMonitorResponse)
query = {}
if start_date or end_date:
query["created_at"] = {}
if start_date:
query["created_at"]["$gte"] = start_date
if end_date:
query["created_at"]["$lte"] = end_date
total = await db.db.attack_advice_logs.count_documents(query)
cursor = db.db.attack_advice_logs.find(query).sort("created_at", -1).limit(limit)
items = []
async for doc in cursor:
doc["id"] = str(doc.pop("_id"))
items.append(AttackAdviceHistoryItem(**doc))
return HistoryQueryResponse(total=total, items=items)
async def _call_llm(self, inputs: Dict[str, Any], model_cls):
try:
......
......@@ -6,3 +6,5 @@ python-jose[cryptography]==3.3.0
httpx==0.27.0
python-dotenv==1.0.1
feedparser>=6.0.10
motor==3.3.2
pymongo<4.7
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment