Commit f04c1aad authored by uuo00_n's avatar uuo00_n

feat(gateway): 添加security-service路由配置

fix(auth-service): 将JWT中的userID转为字符串格式

refactor(security-service): 优化安全分析请求的LLM提示词和响应处理
- 使用streaming模式调用Dify API提高可靠性
- 增强提示词约束确保返回合法JSON
- 改进错误处理和日志记录

docs(security-service): 添加服务使用说明文档
parent c4492b4e
......@@ -91,6 +91,12 @@ http {
proxy_set_header Host $host;
}
# Security Service 路由
location /api/v1/security {
proxy_pass http://security_service/api/v1/security;
proxy_set_header Host $host;
}
# Edu Service Swagger UI (Legacy)
# location /swagger-ui/ ... (moved up)
......
package utils
import (
"fmt"
"os"
"time"
......@@ -35,7 +36,7 @@ func CheckPasswordHash(password, hash string) bool {
// GenerateToken 生成 JWT Token
func GenerateToken(userID uint, username string, role string, roleLevel int, edition string, personID string, personType string) (string, error) {
claims := jwt.MapClaims{
"sub": userID,
"sub": fmt.Sprintf("%d", userID),
"name": username,
"role": role,
"role_level": roleLevel,
......
# Security Service 使用说明
## 1. 服务定位
Security Service 是本项目的安全分析微服务,主要能力包括:
- 基于交换机 / 防火墙 / 服务器等设备信息做 AI 安全风险分析
- 在遭受攻击时给出 AI 应急响应建议
- 生成每日安全日报
- 基于互联网最新漏洞进行风险监测与合规性评估
当前实现为 **无状态服务**
- 不直接读写 PostgreSQL / MongoDB
- 所有分析结果仅在请求周期内计算并返回,不做持久化存储
## 2. 部署与访问入口
### 2.1 通过 Docker Compose 启动
在项目根目录执行:
```bash
cd /Users/uu/Desktop/dles_prj/llm-filter
docker-compose up -d --build security-service gateway
```
### 2.2 访问地址
- 统一网关入口(推荐):`http://localhost:8080`
- 安全服务 API:`/api/v1/security/*`
- 安全服务文档:`http://localhost:8080/docs/security/`
- 直连 Security Service 容器:
- Base URL:`http://localhost:8003`
- API 前缀:`/api/v1/security`
## 3. 鉴权与权限控制
- 所有接口均要求携带 Auth Service 签发的 **JWT**
- HTTP Header:`Authorization: Bearer <token>`
- 服务内部通过 [`get_current_admin`](app/core/security.py) 校验管理员身份:
- 仅当 `role``admin` / `administrator` / `root` 时允许访问
示例 Header:
```http
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
```
## 4. 接口一览
所有路径均在前缀 `/api/v1/security` 下,以下以 **网关地址** 为例:`http://localhost:8080`
### 4.1 安全风险分析
- 方法:`POST`
- URL:`/api/v1/security/analysis`
- 说明:
- 输入网络设备列表(交换机 / 防火墙 / 服务器等),由 AI 分析潜在安全隐患
- 如果不传 `devices`,会使用内置 Mock 设备数据
请求示例:
```json
POST http://localhost:8080/api/v1/security/analysis
Authorization: Bearer <admin_token>
Content-Type: application/json
{
"devices": [
{
"id": "sw-001",
"name": "Core-Switch-A",
"type": "switch",
"status": "warning",
"logs": ["Port 22 high traffic", "Packet loss detected"],
"version": "v1.2.0"
}
]
}
```
响应字段(`SecurityAnalysisResponse`):
- `summary`: 总体安全概况
- `vulnerabilities`: 漏洞 / 风险列表
- `suggestions`: 修复建议列表
- `risk_level`: 风险等级(如 `low` / `medium` / `high` / `critical`
### 4.2 攻击应急建议
- 方法:`POST`
- URL:`/api/v1/security/attack-advice`
- 说明:当系统已遭受攻击时,提供应急响应与缓解方案
请求示例:
```json
POST http://localhost:8080/api/v1/security/attack-advice
Authorization: Bearer <admin_token>
Content-Type: application/json
{
"attack_type": "Brute Force Login",
"target_device": "DB-Server-Prod",
"severity": "high",
"logs": "Failed login attempts from 10.0.0.10"
}
```
响应字段(`AttackAdviceResponse`):
- `immediate_actions`: 立即执行的操作建议列表
- `analysis`: 攻击分析说明
- `mitigation_plan`: 中长期缓解与防护计划
### 4.3 安全日报
- 方法:`GET`
- URL:`/api/v1/security/report`
- 说明:生成企业安全日报,用于面向管理层的安全概览展示
响应字段(`SecurityReportResponse`):
- `date`: 报告日期(`YYYY-MM-DD`
- `overall_status`: 总体安全状态
- `device_summary`: 设备运行状况摘要
- `incident_summary`: 安全事件与拦截情况摘要
- `recommendations`: 后续安全改进建议
### 4.4 风险监测与合规评估
- 方法:`GET`
- URL:`/api/v1/security/monitor`
- 说明:基于互联网最新漏洞信息,评估当前企业的合规风险
响应字段(`RiskMonitorResponse`):
- `detected_vulnerabilities`: 识别到的漏洞列表
- `compliance_risks`: 合规风险点列表
- `ai_assessment`: AI 对整体风险的评估说明
## 5. Dify 集成与降级策略
服务内部通过 Dify 完成大部分安全分析逻辑:
- Dify 调用配置在 [config.py](app/core/config.py)
- `DIFY_API_URL`
- `DIFY_API_KEY`
- 请求由 [`SecurityService._call_llm`](app/services/analysis.py) 统一发起
当 Dify 不可用(网络异常 / 超时 / 抛错)时:
- 会自动降级为内置 Mock 数据
- Mock 数据定义在 [`_get_mock_data`](app/services/analysis.py)
- 保证接口始终能返回结构化 JSON,便于前端联调与联机 demo
## 6. 数据存储与状态
当前版本的 Security Service:
- 不写入任何数据库(PostgreSQL / MongoDB)
- 不记录历史分析结果或报告
- 所有分析基于:
- 请求中传入的设备数据
- 内置 Mock 设备数据 `MOCK_DEVICES`
- 实时从 Dify 获取的分析结果
后续如果需要:
- 可以将分析结果落库到 PostgreSQL 或 MongoDB
- 典型扩展:
- 安全日报历史查询
- 风险趋势分析
- 设备安全基线与偏离检测
## 7. 快速调试示例(curl)
确保容器已启动后,可以在宿主机直接运行:
```bash
# 1. 使用管理员 Token 调用安全分析(通过网关)
curl -X POST "http://localhost:8080/api/v1/security/analysis" \
-H "Authorization: Bearer <admin_token>" \
-H "Content-Type: application/json" \
-d '{"devices": []}'
# 2. 直接访问文档(网关统一入口)
open "http://localhost:8080/docs/security/"
```
from typing import List, Optional
from typing import List, Optional, Any, Union
from pydantic import BaseModel
class DeviceInfo(BaseModel):
......@@ -14,9 +14,9 @@ class SecurityAnalysisRequest(BaseModel):
class SecurityAnalysisResponse(BaseModel):
summary: str
vulnerabilities: List[str]
suggestions: List[str]
risk_level: str
vulnerabilities: List[Union[str, dict, Any]]
suggestions: List[Union[str, dict, Any]]
risk_level: Optional[str] = "unknown"
class AttackAdviceRequest(BaseModel):
attack_type: str
......
......@@ -18,7 +18,22 @@ class SecurityService:
devices = MOCK_DEVICES
device_str = "\n".join([f"{d.name} ({d.type}): Status={d.status}, Logs={d.logs}" for d in devices])
prompt = f"请分析以下网络设备的运行状态和日志,找出潜在的安全隐患,并给出风险等级(low, medium, high, critical)和修复建议。请以 JSON 格式返回,包含以下字段:summary, vulnerabilities (list), suggestions (list), risk_level。\n\n设备信息:\n{device_str}"
prompt = f"""你是一个高级网络安全分析专家智能体。请分析以下网络设备的运行状态和日志,找出潜在的安全隐患。
设备信息:
{device_str}
请严格遵守以下要求:
1. 必须强制使用中文回复。
2. 必须以合法的 JSON 格式返回结果,不要包含 Markdown 代码块标记。
3. 仅返回 JSON 数据本身。
返回的 JSON 结构必须包含以下字段:
- summary (string): 简要的安全分析总结(中文)。
- vulnerabilities (list): 发现的漏洞列表,如果是复杂对象请包含 device, issue, risk_level 字段。
- suggestions (list): 针对性的修复建议列表(中文)。
- risk_level (string): 整体风险等级(low, medium, high, critical)。
"""
return await self._call_llm(prompt, "analysis", SecurityAnalysisResponse)
......@@ -41,19 +56,41 @@ class SecurityService:
async def _call_llm(self, prompt: str, mock_type: str, model_cls):
try:
# 尝试调用 Dify
# 尝试调用 Dify (使用 streaming 模式,因为 Agent App 不支持 blocking)
headers = {"Authorization": f"Bearer {settings.DIFY_API_KEY}", "Content-Type": "application/json"}
payload = {"inputs": {}, "query": prompt, "response_mode": "blocking", "conversation_id": "", "user": "security-system"}
payload = {"inputs": {}, "query": prompt, "response_mode": "streaming", "conversation_id": "", "user": "security-system"}
url = f"{settings.DIFY_API_URL.rstrip('/')}/chat-messages"
full_answer = ""
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload, headers=headers, timeout=60.0)
if resp.status_code == 200:
data = resp.json()
answer = data.get("answer", "")
json_data = self._extract_json(answer)
if json_data:
return model_cls(**json_data)
async with client.stream("POST", url, json=payload, headers=headers, timeout=120.0) as resp:
if resp.status_code == 200:
async for line in resp.aiter_lines():
if line.startswith("data: "):
try:
# 处理 SSE 格式数据
json_str = line[6:].strip()
if not json_str:
continue
data = json.loads(json_str)
event = data.get("event")
# message 是 Chat App, agent_message 是 Agent App
if event in ["message", "agent_message"]:
full_answer += data.get("answer", "")
except Exception as e:
# 忽略解析错误的行
continue
else:
# 读取错误响应体
error_body = await resp.read()
print(f"LLM Call Failed: Status={resp.status_code}, Response={error_body.decode()}")
if full_answer:
# print(f"DEBUG: Full LLM Answer: {full_answer}")
json_data = self._extract_json(full_answer)
if json_data:
return model_cls(**json_data)
except Exception as e:
print(f"LLM Call Error: {e}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment