Commit c4492b4e authored by uuo00_n's avatar uuo00_n

feat(security-service): 新增安全分析微服务基础架构

- 初始化 FastAPI 应用结构及核心模块
- 实现安全分析、攻击建议、风险监测等核心 API
- 添加 JWT 鉴权及管理员权限控制
- 集成 Dify LLM 进行安全分析
- 更新 docker-compose 和网关配置
- 补充 README 文档和安全服务说明
parent 4ff8d595
......@@ -15,6 +15,7 @@
| **Gateway** | Nginx | 8080 | 统一 API 网关,负责请求路由转发与跨域处理 |
| **Auth Service** | Go (Gin) | 8081 | 用户注册、登录、JWT 签发、绑定管理 |
| **Edu Service** | Java (Spring Boot) | 8082 | 教务/业务核心服务(班级、人员、教师、课表、看板等) |
| **Security Service** | Python (FastAPI) | 8003 | 安全分析、风险监测、攻击应急、安全日报 |
| **LLM Service** | Python (FastAPI) | 8000 | 智能对话、敏感词过滤、审计日志、敏感词管理 |
### 基础设施
......@@ -38,9 +39,15 @@
### Gateway(Nginx)
- **统一入口**:对外只暴露一个入口,由网关按路径将请求转发到各后端服务。
- **接口文档**:统一管理所有微服务的 Swagger 文档入口
- **LLM Service**: `http://localhost:8080/docs/llm`
- **Security Service**: `http://localhost:8080/docs/security`
- **Edu Service**: `http://localhost:8080/docs/edu`
- **Auth Service**: `http://localhost:8080/docs/auth`
- **路由转发**
- 认证服务:`/api/v1/auth/*``/api/v1/bindings/*`
- 教务服务:`/api/v1/classes/*``/api/v1/persons/*``/api/v1/teachers/*``/api/v1/schedules/*``/api/v1/dashboard/*``/api/v1/edu/*`
- 安全服务:`/api/v1/security/*`
- LLM 服务:其余路径默认转发(包含 `/docs``/api/v1/*` 等)
- **跨域与压缩**:统一设置 CORS 响应头与 gzip,减少前端对接成本。
......@@ -74,6 +81,16 @@
- 校级管理:`/api/v1/dashboard/campus/overview`
- **JWT 本地解析**:通过 `JwtAuthenticationFilter` 解析 `Authorization: Bearer <token>`,将用户信息写入 `UserContext`,业务层可直接读取当前用户身份。
### Security Service(Python / FastAPI)
- **AI 安全分析**
- `POST /api/v1/security/analysis`:基于设备信息(交换机/防火墙/服务器)进行 AI 安全隐患分析
- `POST /api/v1/security/attack/advice`:针对正在遭受的攻击提供 AI 应急建议
- **风险监测与日报**
- `GET /api/v1/security/monitor/risk`:AI 联网检索最新漏洞与合规风险
- `GET /api/v1/security/reports/daily`:生成企业安全状态日报
- **权限控制**:仅限管理员(`role_level >= 9`)访问
### LLM Service(Python / FastAPI)
- **对话管理**
......
......@@ -62,6 +62,7 @@ services:
context: ./microservices/llm-service
dockerfile: Dockerfile
container_name: llm-filter-llm
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload --proxy-headers --forwarded-allow-ips '*'
ports:
- "8000:8000"
volumes:
......@@ -84,6 +85,24 @@ services:
networks:
- llm-network
# 安全分析服务 (Python)
security-service:
build:
context: ./microservices/security-service
dockerfile: Dockerfile
container_name: llm-filter-security
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload --proxy-headers --forwarded-allow-ips '*'
ports:
- "8003:8000"
volumes:
- ./microservices/security-service/app:/app/app
environment:
- JWT_SECRET=llm_filter_secure_secret_key_2025_update_must_be_32_bytes
- DIFY_API_URL=http://datacenter.dldzxx.cn:8089/v1
- DIFY_API_KEY=app-xTl0Ri6ir7cAuvngDtFe3hKP
networks:
- llm-network
# MongoDB 服务 (LLM 服务依赖)
mongo:
image: mongo:latest
......@@ -107,6 +126,7 @@ services:
- auth-service
- edu-service
- llm-service
- security-service
networks:
- llm-network
......
......@@ -22,6 +22,10 @@ http {
server llm-service:8000;
}
upstream security_service {
server security-service:8000;
}
server {
listen 80;
server_name localhost;
......@@ -87,19 +91,79 @@ http {
proxy_set_header Host $host;
}
# Edu Service Swagger UI
location /swagger-ui/ {
proxy_pass http://edu_service/swagger-ui/;
# Edu Service Swagger UI (Legacy)
# location /swagger-ui/ ... (moved up)
# 3. Security Service 文档
# 处理缺少斜杠的情况
location = /docs/security {
return 301 /docs/security/;
}
# 主文档页面
location /docs/security/ {
proxy_pass http://security_service/docs;
proxy_set_header Host $host;
# 告诉 FastAPI 它被挂载在 /docs/security 下,这样它生成的 openapi.json URL 就会带上这个前缀
proxy_set_header X-Forwarded-Prefix /docs/security;
}
# 针对 Security Service 的 schema 代理 (精确匹配)
location = /docs/security/openapi.json {
proxy_pass http://security_service/openapi.json;
proxy_set_header Host $host;
}
# 4. LLM Service 文档
location = /docs/llm {
return 301 /docs/llm/;
}
location /docs/llm/ {
proxy_pass http://llm_service/docs;
proxy_set_header Host $host;
}
# 5. Auth Service 文档
location = /docs/auth { return 301 /docs/auth/; }
# 根文档页面 -> /swagger/index.html
location = /docs/auth/ {
proxy_pass http://auth_service/swagger/index.html;
proxy_set_header Host $host;
}
# 静态资源与子路径 -> /swagger/*
location /docs/auth/ {
proxy_pass http://auth_service/swagger/;
proxy_set_header Host $host;
}
# 6. Edu Service 文档
location = /docs/edu { return 301 /docs/edu/; }
# 根文档页面 -> /swagger-ui/index.html
location = /docs/edu/ {
proxy_pass http://edu_service/swagger-ui/index.html;
proxy_set_header Host $host;
}
location /swagger-ui.html {
proxy_pass http://edu_service/swagger-ui.html;
# 静态资源与子路径 -> /swagger-ui/*
location /docs/edu/ {
proxy_pass http://edu_service/swagger-ui/;
proxy_set_header Host $host;
}
# Edu Service 需要的 API Docs 路径 (保持原样,供 swagger-ui 调用)
location /v3/api-docs {
proxy_pass http://edu_service/v3/api-docs;
proxy_set_header Host $host;
}
# Edu Service 原 swagger-ui 路径 (可选保留)
location /swagger-ui/ {
proxy_pass http://edu_service/swagger-ui/;
proxy_set_header Host $host;
}
location = /openapi.json {
proxy_pass http://security_service/openapi.json;
proxy_set_header Host $host;
}
# LLM Service 路由 (默认/兜底)
# 包括 /docs, /openapi.json, /api/v1/chat, /api/v1/admin 等
......
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
from fastapi import APIRouter, Depends
from app.schemas.payloads import *
from app.services.analysis import SecurityService
from app.core.security import get_current_admin
router = APIRouter()
service = SecurityService()
@router.post("/analysis", response_model=SecurityAnalysisResponse)
async def analyze_risks(request: SecurityAnalysisRequest, admin: dict = Depends(get_current_admin)):
return await service.analyze_risks(request.devices)
@router.post("/attack-advice", response_model=AttackAdviceResponse)
async def get_attack_advice(request: AttackAdviceRequest, admin: dict = Depends(get_current_admin)):
return await service.get_attack_advice(request.attack_type, request.target_device, request.logs)
@router.get("/report", response_model=SecurityReportResponse)
async def generate_report(admin: dict = Depends(get_current_admin)):
return await service.generate_report()
@router.get("/monitor", response_model=RiskMonitorResponse)
async def monitor_risks(admin: dict = Depends(get_current_admin)):
return await service.monitor_risks()
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Security Service"
# 鉴权配置 (与 Auth Service 保持一致)
JWT_SECRET: str = "llm_filter_secure_secret_key_2025_update_must_be_32_bytes"
ALGORITHM: str = "HS256"
# Dify 配置
DIFY_API_URL: str = "http://datacenter.dldzxx.cn:8089/v1"
DIFY_API_KEY: str = "app-lkK33EQOVXXrjD9x3SKbItr7"
class Config:
case_sensitive = True
settings = Settings()
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from app.core.config import settings
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_admin(payload: dict = Depends(get_current_user)):
"""仅允许管理员访问"""
role = payload.get("role")
# 兼容常见的管理员角色标识
if role not in ["admin", "administrator", "root"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限 (Administrator access required)",
)
return payload
from fastapi import FastAPI
from app.api.v1.endpoints import router as security_router
from app.core.config import settings
app = FastAPI(title=settings.PROJECT_NAME)
# 注册路由
app.include_router(security_router, prefix=f"{settings.API_V1_STR}/security", tags=["Security"])
@app.get("/")
def health_check():
return {"status": "ok", "service": "security-service"}
from typing import List, Optional
from pydantic import BaseModel
class DeviceInfo(BaseModel):
id: str
name: str
type: str # switch, firewall, server
status: str
logs: Optional[List[str]] = None
version: Optional[str] = None
class SecurityAnalysisRequest(BaseModel):
devices: Optional[List[DeviceInfo]] = None
class SecurityAnalysisResponse(BaseModel):
summary: str
vulnerabilities: List[str]
suggestions: List[str]
risk_level: str
class AttackAdviceRequest(BaseModel):
attack_type: str
target_device: str
severity: str
logs: Optional[str] = None
class AttackAdviceResponse(BaseModel):
immediate_actions: List[str]
analysis: str
mitigation_plan: str
class SecurityReportResponse(BaseModel):
date: str
overall_status: str
device_summary: str
incident_summary: str
recommendations: str
class RiskMonitorResponse(BaseModel):
detected_vulnerabilities: List[str]
compliance_risks: List[str]
ai_assessment: str
import json
import httpx
from datetime import datetime
from typing import List, Dict, Any
from app.schemas.payloads import *
from app.core.config import settings
# Mock Data
MOCK_DEVICES = [
DeviceInfo(id="sw-001", name="Core-Switch-A", type="switch", status="warning", version="v1.2.0", logs=["Port 22 high traffic", "Packet loss detected"]),
DeviceInfo(id="fw-001", name="Edge-Firewall", type="firewall", status="active", version="v2.1.patch3", logs=["Denied 1000+ requests from IP 192.168.1.50"]),
DeviceInfo(id="srv-001", name="DB-Server-Prod", type="server", status="active", version="Ubuntu 20.04", logs=["Failed login attempts: 5"]),
]
class SecurityService:
async def analyze_risks(self, devices: List[DeviceInfo] = None) -> SecurityAnalysisResponse:
if not devices:
devices = MOCK_DEVICES
device_str = "\n".join([f"{d.name} ({d.type}): Status={d.status}, Logs={d.logs}" for d in devices])
prompt = f"请分析以下网络设备的运行状态和日志,找出潜在的安全隐患,并给出风险等级(low, medium, high, critical)和修复建议。请以 JSON 格式返回,包含以下字段:summary, vulnerabilities (list), suggestions (list), risk_level。\n\n设备信息:\n{device_str}"
return await self._call_llm(prompt, "analysis", SecurityAnalysisResponse)
async def get_attack_advice(self, attack_type: str, target: str, logs: str) -> AttackAdviceResponse:
prompt = f"当前系统正在遭受攻击!\n攻击类型:{attack_type}\n目标设备:{target}\n相关日志:{logs}\n\n请立即给出应急响应建议。以 JSON 格式返回:immediate_actions (list), analysis, mitigation_plan。"
return await self._call_llm(prompt, "advice", AttackAdviceResponse)
async def generate_report(self) -> SecurityReportResponse:
prompt = f"请根据以下概况生成一份企业安全日报。\n日期:{datetime.now().strftime('%Y-%m-%d')}\n设备状态:3台设备运行中,1台有告警。\n拦截攻击:1500次。\n\n请以 JSON 格式返回:date, overall_status, device_summary, incident_summary, recommendations。"
return await self._call_llm(prompt, "report", SecurityReportResponse)
async def monitor_risks(self) -> RiskMonitorResponse:
recent_vulns = ["CVE-2023-44487 (HTTP/2 Rapid Reset)", "Log4j 变种漏洞", "Nginx 权限提升漏洞"]
prompt = f"我检索到了以下互联网最新的安全漏洞信息:\n{', '.join(recent_vulns)}\n\n请分析这些漏洞对一般企业(使用 Nginx, Java, Python)的合规风险。请以 JSON 格式返回:detected_vulnerabilities (list), compliance_risks (list), ai_assessment。"
response = await self._call_llm(prompt, "monitor", RiskMonitorResponse)
if not response.detected_vulnerabilities:
response.detected_vulnerabilities = recent_vulns
return response
async def _call_llm(self, prompt: str, mock_type: str, model_cls):
try:
# 尝试调用 Dify
headers = {"Authorization": f"Bearer {settings.DIFY_API_KEY}", "Content-Type": "application/json"}
payload = {"inputs": {}, "query": prompt, "response_mode": "blocking", "conversation_id": "", "user": "security-system"}
url = f"{settings.DIFY_API_URL.rstrip('/')}/chat-messages"
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload, headers=headers, timeout=60.0)
if resp.status_code == 200:
data = resp.json()
answer = data.get("answer", "")
json_data = self._extract_json(answer)
if json_data:
return model_cls(**json_data)
except Exception as e:
print(f"LLM Call Error: {e}")
# 降级使用 Mock 数据
return model_cls(**self._get_mock_data(mock_type))
def _extract_json(self, text: str) -> Dict[str, Any]:
try:
start = text.find('{')
end = text.rfind('}') + 1
if start != -1 and end != -1:
return json.loads(text[start:end])
return json.loads(text)
except:
return {}
def _get_mock_data(self, type: str) -> Dict[str, Any]:
if type == "analysis":
return {"summary": "核心交换机存在异常流量。", "vulnerabilities": ["DDoS 攻击迹象"], "suggestions": ["检查端口配置"], "risk_level": "high"}
elif type == "advice":
return {"immediate_actions": ["封锁 IP"], "analysis": "暴力破解攻击。", "mitigation_plan": "启用 MFA。"}
elif type == "report":
return {"date": datetime.now().strftime('%Y-%m-%d'), "overall_status": "良好", "device_summary": "设备正常", "incident_summary": "拦截 1500 次", "recommendations": "定期更新"}
elif type == "monitor":
return {"detected_vulnerabilities": ["CVE-2023-44487"], "compliance_risks": ["服务中断风险"], "ai_assessment": "需立即更新补丁。"}
return {}
fastapi==0.109.2
uvicorn==0.27.1
pydantic==2.6.1
pydantic-settings==2.1.0
python-jose[cryptography]==3.3.0
httpx==0.27.0
python-dotenv==1.0.1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment