Commit 4d105058 authored by uuo's avatar uuo

feat(security-service): 集成Zabbix监控并重构安全分析服务

- 新增Zabbix数据采集服务,支持从Zabbix服务器获取实时设备数据
- 重构安全分析服务,使用真实Zabbix数据替代模拟数据进行风险分析
- 添加生产环境Docker配置,包括非root用户运行和多worker优化
- 新增健康检查端点(/health, /ready)用于服务监控
- 提供本地开发启动脚本(start.sh, start.bat)
- 移除过时文档,更新环境变量配置以支持Zabbix集成
- 改进日志配置,支持JSON格式日志输出
parent 27e24cfb
......@@ -86,8 +86,8 @@ services:
dockerfile: Dockerfile
container_name: llm-filter-llm
restart: always
# 生产环境不需要 --reload
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --proxy-headers --forwarded-allow-ips '*'
# 生产环境:移除--reload,使用多worker提高性能
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4 --proxy-headers --forwarded-allow-ips '*'
ports:
- "${LLM_SERVICE_PORT:-8000}:8000"
# 生产环境移除代码挂载,使用镜像内的代码
......@@ -130,8 +130,8 @@ services:
dockerfile: Dockerfile
container_name: llm-filter-security
restart: always
# 生产环境不需要 --reload
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --proxy-headers --forwarded-allow-ips '*'
# 生产环境:移除--reload,使用多worker提高性能
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4 --proxy-headers --forwarded-allow-ips '*'
ports:
- "${SECURITY_SERVICE_PORT:-8003}:8000"
# 生产环境移除代码挂载
......@@ -147,6 +147,13 @@ services:
- REDIS_PORT=${REDIS_PORT}
- REDIS_DB=${REDIS_DB}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- ZABBIX_URL=${ZABBIX_URL:-http://localhost/zabbix/api_jsonrpc.php}
- ZABBIX_USERNAME=${ZABBIX_USERNAME:-Admin}
- ZABBIX_PASSWORD=${ZABBIX_PASSWORD:-zabbix}
- ZABBIX_SYNC_INTERVAL=${ZABBIX_SYNC_INTERVAL:-3600}
- ZABBIX_AUTO_SYNC=${ZABBIX_AUTO_SYNC:-true}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
- LOG_FORMAT=${LOG_FORMAT:-json}
- TZ=${TZ}
networks:
- llm-network
......
......@@ -150,6 +150,13 @@ services:
- REDIS_PORT=${REDIS_PORT}
- REDIS_DB=${REDIS_DB}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- ZABBIX_URL=${ZABBIX_URL:-http://localhost/zabbix/api_jsonrpc.php}
- ZABBIX_USERNAME=${ZABBIX_USERNAME:-Admin}
- ZABBIX_PASSWORD=${ZABBIX_PASSWORD:-zabbix}
- ZABBIX_SYNC_INTERVAL=${ZABBIX_SYNC_INTERVAL:-3600}
- ZABBIX_AUTO_SYNC=${ZABBIX_AUTO_SYNC:-true}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
- LOG_FORMAT=${LOG_FORMAT:-text}
- TZ=${TZ}
depends_on:
- mongo
......
FROM python:3.10-slim
# 添加非root用户
RUN useradd -m -u 1000 appuser && \
mkdir -p /app && \
chown -R appuser:appuser /app
WORKDIR /app
# 设置时区
......@@ -11,10 +16,14 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 复制业务代码
COPY . .
COPY --chown=appuser:appuser . .
# 切换到非root用户
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# 启动命令(生产环境,多worker)
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.router import api_router
from app.core.config import settings
from app.db.mongodb import connect_to_mongo, close_mongo_connection
from app.db.mongodb import connect_to_mongo, close_mongo_connection, client
from app.utils.sensitive_word_filter import sensitive_word_filter
from datetime import datetime
app = FastAPI(
title=settings.APP_NAME,
......@@ -77,3 +78,45 @@ async def root():
"version": "1.0.0",
"message": "欢迎使用LLM过滤系统API"
}
@app.get("/health")
async def health_check():
"""健康检查端点"""
try:
# 检查数据库连接
db_status = "ok" if client else "error"
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "llm-service",
"version": "1.0.0",
"components": {
"mongodb": {"status": db_status}
}
}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Service unhealthy: {str(e)}")
@app.get("/ready")
async def readiness_check():
"""就绪检查端点"""
try:
# 检查关键依赖是否就绪
if not client:
return {
"status": "not_ready",
"timestamp": datetime.now().isoformat(),
"reason": "database_not_connected"
}
return {
"status": "ready",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "not_ready",
"timestamp": datetime.now().isoformat(),
"reason": str(e)
}
# Security Service API 测试与假数据示例
本文件用于说明如何通过 HTTP API(经网关或直连服务)调用 Security Service,并使用一组统一的假数据进行联调和回归测试。
> 注意:这些假数据仅用于测试,生产环境请接入真实监控和日志数据。
---
## 1. 环境与前提条件
- 网关地址(推荐):`http://localhost:8080`
- Security Service API 前缀:`/api/v1/security`
- 所有接口均需要携带管理员 JWT:
- Header:`Authorization: Bearer <admin_token>`
在以下示例中,请将 `<admin_token>` 替换为实际从 Auth Service 获取的 token。
---
## 2. 接口一览
- `POST /api/v1/security/analysis` —— 安全风险分析
- `POST /api/v1/security/attack-advice` —— 攻击应急建议
- `GET /api/v1/security/report` —— 安全日报
- `GET /api/v1/security/monitor` —— 风险监测与合规评估
- `GET /api/v1/security/rss/news` —— 安全新闻 RSS 订阅
---
## 3. 安全风险分析(/analysis)测试示例
### 3.1 请求说明
- 方法:`POST`
- URL:`http://localhost:8080/api/v1/security/analysis`
- 用途:基于设备信息列表做 AI 风险分析。
### 3.2 测试请求体(假数据)
```json
{
"devices": [
{
"id": "sw-001",
"name": "Core-Switch-A",
"type": "switch",
"status": "warning",
"version": "v1.2.0",
"logs": [
"Port 22 high traffic",
"Packet loss detected"
]
},
{
"id": "fw-001",
"name": "Edge-Firewall",
"type": "firewall",
"status": "active",
"version": "v2.1.patch3",
"logs": [
"Denied 1000+ requests from IP 192.168.1.50"
]
}
]
}
```
### 3.3 curl 示例
```bash
curl -X POST "http://localhost:8080/api/v1/security/analysis" \
-H "Authorization: Bearer <admin_token>" \
-H "Content-Type: application/json" \
-d '{
"devices": [
{
"id": "sw-001",
"name": "Core-Switch-A",
"type": "switch",
"status": "warning",
"version": "v1.2.0",
"logs": ["Port 22 high traffic", "Packet loss detected"]
},
{
"id": "fw-001",
"name": "Edge-Firewall",
"type": "firewall",
"status": "active",
"version": "v2.1.patch3",
"logs": ["Denied 1000+ requests from IP 192.168.1.50"]
}
]
}'
```
### 3.4 期望响应结构
服务端返回 JSON 结构符合 `SecurityAnalysisResponse`
```json
{
"summary": "string",
"vulnerabilities": ["string"],
"suggestions": ["string"],
"risk_level": "string"
}
```
---
## 4. 攻击应急建议(/attack-advice)测试示例
### 4.1 请求说明
- 方法:`POST`
- URL:`http://localhost:8080/api/v1/security/attack-advice`
- 用途:当系统已遭受攻击或疑似攻击时,获取应急响应方案。
### 4.2 测试请求体(假数据)
```json
{
"attack_type": "Brute Force Login",
"target_device": "DB-Server-Prod",
"severity": "high",
"logs": "Failed login attempts from 10.0.0.10"
}
```
### 4.3 curl 示例
```bash
curl -X POST "http://localhost:8080/api/v1/security/attack-advice" \
-H "Authorization: Bearer <admin_token>" \
-H "Content-Type: application/json" \
-d '{
"attack_type": "Brute Force Login",
"target_device": "DB-Server-Prod",
"severity": "high",
"logs": "Failed login attempts from 10.0.0.10"
}'
```
### 4.4 期望响应结构
```json
{
"immediate_actions": ["string"],
"analysis": "string",
"mitigation_plan": "string"
}
```
---
## 5. 安全日报(/report)测试示例
### 5.1 请求说明
- 方法:`GET`
- URL:`http://localhost:8080/api/v1/security/report`
- 用途:生成面向管理层的每日安全概览。
### 5.2 curl 示例
```bash
curl -X GET "http://localhost:8080/api/v1/security/report" \
-H "Authorization: Bearer <admin_token>"
```
> 当前实现中,后端会构造简单的统计信息传给 Dify。实际环境应接入真实监控数据。
### 5.3 期望响应结构
```json
{
"date": "string",
"overall_status": "string",
"device_summary": "string",
"incident_summary": "string",
"recommendations": "string"
}
```
---
## 6. 风险监测与合规评估(/monitor)测试示例
### 6.1 请求说明
- 方法:`GET`
- URL:`http://localhost:8080/api/v1/security/monitor`
- 用途:基于给定的漏洞信息进行整体风险与合规评估。
> 当前代码中 `context_data` 为空数组时,智能体可以根据默认经验或空数据做基线评估。也可以在后续版本中从漏洞库/配置注入具体列表。
### 6.2 curl 示例
```bash
curl -X GET "http://localhost:8080/api/v1/security/monitor" \
-H "Authorization: Bearer <admin_token>"
```
### 6.3 期望响应结构
```json
{
"detected_vulnerabilities": ["string"],
"compliance_risks": ["string"],
"ai_assessment": "string"
}
---
## 7. 安全新闻 RSS 订阅(/rss/news)测试示例
### 7.1 请求说明
- 方法:`GET`
- URL:`http://localhost:8080/api/v1/security/rss/news`
- 用途:获取来自天融信、360 CERT、绿盟等安全厂商的最新资讯。
### 7.2 curl 示例
```bash
curl -X GET "http://localhost:8080/api/v1/security/rss/news" \
-H "Authorization: Bearer <admin_token>"
```
### 7.3 期望响应结构
```json
{
"items": [
{
"title": "string",
"link": "string",
"description": "string",
"published": "string",
"source": "string"
}
]
}
---
## 8. 测试建议
- 在联调阶段,可以先固定一组假数据(如本文件中的示例),确保:
- Dify 智能体返回的 JSON 结构稳定且字段完整;
- Security Service 能正确解析并返回给前端。
- 回归测试时:
- 建议将这些请求录入到自动化测试脚本中(如 pytest + httpx/requests),
- 对响应结构做 Schema 校验,避免后续修改提示词或模型配置导致输出结构破坏。
\ No newline at end of file
FROM python:3.11-slim
# 添加非root用户
RUN useradd -m -u 1000 appuser && \
mkdir -p /app && \
chown -R appuser:appuser /app
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 复制应用代码
COPY --chown=appuser:appuser . .
# 切换到非root用户
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令(生产环境,多worker)
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Security Service 使用说明
## 1. 服务定位
Security Service 是本项目的安全分析微服务,主要能力包括:
- 基于交换机 / 防火墙 / 服务器等设备信息做 AI 安全风险分析
- 在遭受攻击时给出 AI 应急响应建议
- 生成每日安全日报
- 基于互联网最新漏洞进行风险监测与合规性评估
当前实现为 **无状态服务**
- 不直接读写 PostgreSQL / MongoDB
- 所有分析结果仅在请求周期内计算并返回,不做持久化存储
## 2. 部署与访问入口
### 2.1 通过 Docker Compose 启动
在项目根目录执行:
```bash
cd /Users/uu/Desktop/dles_prj/llm-filter
docker-compose up -d --build security-service gateway
```
### 2.2 访问地址
- 统一网关入口(推荐):`http://localhost:8080`
- 安全服务 API:`/api/v1/security/*`
- 安全服务文档:`http://localhost:8080/docs/security/`
- 直连 Security Service 容器:
- Base URL:`http://localhost:8003`
- API 前缀:`/api/v1/security`
## 3. 鉴权与权限控制
- 所有接口均要求携带 Auth Service 签发的 **JWT**
- HTTP Header:`Authorization: Bearer <token>`
- 服务内部通过 [`get_current_admin`](app/core/security.py) 校验管理员身份:
- 仅当 `role``admin` / `administrator` / `root` 时允许访问
示例 Header:
```http
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
```
## 4. 接口一览
所有路径均在前缀 `/api/v1/security` 下,以下以 **网关地址** 为例:`http://localhost:8080`
### 4.1 安全风险分析
- 方法:`POST`
- URL:`/api/v1/security/analysis`
- 说明:
- 输入网络设备列表(交换机 / 防火墙 / 服务器等),由 AI 分析潜在安全隐患
- **注意**:必须传入有效的 `devices` 列表,否则将返回空结果或错误。
请求示例:
```json
POST http://localhost:8080/api/v1/security/analysis
Authorization: Bearer <admin_token>
Content-Type: application/json
{
"devices": [
{
"id": "sw-001",
"name": "Core-Switch-A",
"type": "switch",
"status": "warning",
"logs": ["Port 22 high traffic", "Packet loss detected"],
"version": "v1.2.0"
}
]
}
```
响应字段(`SecurityAnalysisResponse`):
- `summary`: 总体安全概况
- `vulnerabilities`: 漏洞 / 风险列表
- `suggestions`: 修复建议列表
- `risk_level`: 风险等级(如 `low` / `medium` / `high` / `critical`
### 4.2 攻击应急建议
- 方法:`POST`
- URL:`/api/v1/security/attack-advice`
- 说明:当系统已遭受攻击时,提供应急响应与缓解方案
请求示例:
```json
POST http://localhost:8080/api/v1/security/attack-advice
Authorization: Bearer <admin_token>
Content-Type: application/json
{
"attack_type": "Brute Force Login",
"target_device": "DB-Server-Prod",
"severity": "high",
"logs": "Failed login attempts from 10.0.0.10"
}
```
响应字段(`AttackAdviceResponse`):
- `immediate_actions`: 立即执行的操作建议列表
- `analysis`: 攻击分析说明
- `mitigation_plan`: 中长期缓解与防护计划
### 4.3 安全日报
- 方法:`GET`
- URL:`/api/v1/security/report`
- 说明:生成企业安全日报,用于面向管理层的安全概览展示
- **注意**:当前实现需要接入真实数据源才能生成有效报告,否则返回空状态。
响应字段(`SecurityReportResponse`):
- `date`: 报告日期(`YYYY-MM-DD`
- `overall_status`: 总体安全状态
- `device_summary`: 设备运行状况摘要
- `incident_summary`: 安全事件与拦截情况摘要
- `recommendations`: 后续安全改进建议
### 4.4 风险监测与合规评估
- 方法:`GET`
- URL:`/api/v1/security/monitor`
- 说明:基于互联网最新漏洞信息,评估当前企业的合规风险
- **注意**:需要配置或接入外部漏洞数据库,否则返回空列表。
响应字段(`RiskMonitorResponse`):
- `detected_vulnerabilities`: 识别到的漏洞列表
- `compliance_risks`: 合规风险点列表
- `ai_assessment`: AI 对整体风险的评估说明
### 4.5 安全新闻 RSS 订阅
- 方法:`GET`
- URL:`/api/v1/security/rss/news`
- 说明:获取来自天融信、360 CERT、绿盟等安全厂商的最新 RSS 安全资讯。
响应字段(`RSSFeedResponse`):
- `items`: 新闻列表,包含标题、链接、摘要、发布时间和来源。
## 5. Dify 集成与异常处理
服务内部通过 Dify 完成大部分安全分析逻辑:
- Dify 调用配置在 [config.py](app/core/config.py)
- `DIFY_API_URL`
- `DIFY_API_KEY`
- 请求由 [`SecurityService._call_llm`](app/services/analysis.py) 统一发起
- **智能体 Prompt 配置**:请参考 [DIFY_PROMPT.md](DIFY_PROMPT.md) 文档,在 Dify 平台配置对应的 System Prompt 和变量。
当 Dify 不可用(网络异常 / 超时 / 抛错)时:
- 服务将记录错误日志
- 抛出异常供上层处理或返回 HTTP 500 错误
- **不再提供 Mock 数据降级**,以确保运维人员能及时感知服务状态异常。
## 6. 数据存储与状态
当前版本的 Security Service:
- **数据持久化**:分析结果(风险分析与攻击建议)会异步存储到 **MongoDB** 数据库中。
- **历史查询**:提供接口查询历史分析记录。
## 7. 历史查询接口
### 7.1 安全分析历史
- 方法:`GET`
- URL:`/api/v1/security/analysis/history`
- 参数:
- `start_date` (可选): 开始时间 (ISO 8601)
- `end_date` (可选): 结束时间 (ISO 8601)
- `limit` (可选): 返回数量限制,默认 20
- 响应:包含 `total``items` (AnalysisHistoryItem)
### 7.2 攻击建议历史
- 方法:`GET`
- URL:`/api/v1/security/attack-advice/history`
- 参数:
- `start_date` (可选): 开始时间 (ISO 8601)
- `end_date` (可选): 结束时间 (ISO 8601)
- `limit` (可选): 返回数量限制,默认 20
- 响应:包含 `total``items` (AttackAdviceHistoryItem)
## 8. 快速调试示例(curl)
确保容器已启动后,可以在宿主机直接运行:
```bash
# 1. 使用管理员 Token 调用安全分析(通过网关)
# 注意:请确保传入真实的设备数据
curl -X POST "http://localhost:8080/api/v1/security/analysis" \
-H "Authorization: Bearer <admin_token>" \
-H "Content-Type: application/json" \
-d '{"devices": [{"id": "test-1", "name": "Test-Device", "type": "server", "status": "active"}]}'
# 2. 直接访问文档(网关统一入口)
open "http://localhost:8080/docs/security/"
```
......@@ -2,12 +2,14 @@ from fastapi import APIRouter, Depends, Query
from app.schemas.payloads import *
from app.services.analysis import SecurityService
from app.services.rss import RSSService
from app.services.zabbix_service import ZabbixService
from app.core.security import get_current_admin
from app.core.database import db
from datetime import datetime, timezone
router = APIRouter()
service = SecurityService()
zabbix_service = ZabbixService()
service = SecurityService(zabbix_service=zabbix_service)
rss_service = RSSService()
@router.post("/analysis", response_model=SecurityAnalysisResponse)
......@@ -73,3 +75,53 @@ async def get_security_news(admin: dict = Depends(get_current_admin)):
"""
return await rss_service.get_security_news()
@router.post("/zabbix/sync")
async def sync_zabbix_data(admin: dict = Depends(get_current_admin)):
"""
手动同步Zabbix数据
"""
try:
result = await zabbix_service.sync_data()
return {
"status": "success",
"message": "Zabbix数据同步完成",
"data": result
}
except Exception as e:
return {
"status": "error",
"message": f"Zabbix数据同步失败: {str(e)}",
"data": None
}
@router.get("/zabbix/status")
async def get_zabbix_status(admin: dict = Depends(get_current_admin)):
"""
获取Zabbix服务状态
"""
status = zabbix_service.get_sync_status()
return {
"status": "success",
"data": status
}
@router.post("/zabbix/devices")
async def get_zabbix_devices(admin: dict = Depends(get_current_admin)):
"""
获取Zabbix设备列表
"""
try:
device_data = await zabbix_service.collect_device_data()
return {
"status": "success",
"message": f"成功获取 {len(device_data.get('devices', []))} 台设备",
"data": device_data
}
except Exception as e:
return {
"status": "error",
"message": f"获取设备数据失败: {str(e)}",
"data": None
}
......@@ -24,6 +24,15 @@ class Settings(BaseSettings):
REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
REDIS_PASSWORD: str = os.getenv("REDIS_PASSWORD", "")
# Zabbix 配置
ZABBIX_URL: str = os.getenv("ZABBIX_URL", "http://localhost/zabbix/api_jsonrpc.php")
ZABBIX_USERNAME: str = os.getenv("ZABBIX_USERNAME", "Admin")
ZABBIX_PASSWORD: str = os.getenv("ZABBIX_PASSWORD", "zabbix")
# 数据同步配置
ZABBIX_SYNC_INTERVAL: int = int(os.getenv("ZABBIX_SYNC_INTERVAL", "3600")) # 1小时
ZABBIX_AUTO_SYNC: bool = os.getenv("ZABBIX_AUTO_SYNC", "true").lower() == "true"
class Config:
case_sensitive = True
......
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from app.api.v1.endpoints import router as security_router
from app.core.config import settings
from app.core.database import db
from datetime import datetime
import logging
import logging
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger = logging.getLogger(__name__)
@app.get("/")
def health_check():
"""根路径健康检查"""
return {"status": "ok", "service": "security-service", "version": "2.0.0"}
@app.get("/health")
async def detailed_health_check():
"""详细健康检查端点"""
try:
# 检查数据库连接
db_status = "ok" if db.db else "error"
# 检查Zabbix服务
zabbix_status = {"status": "not_configured", "message": "Zabbix服务未配置"}
try:
from app.services.zabbix_service import ZabbixService
zabbix_service = ZabbixService()
sync_status = zabbix_service.get_sync_status()
if sync_status["collector_initialized"]:
zabbix_status = {
"status": "ok",
"last_sync": sync_status["last_sync_time"]
}
else:
zabbix_status = {
"status": "error",
"message": "Zabbix collector未初始化"
}
except Exception as e:
zabbix_status = {
"status": "error",
"message": str(e)
}
from datetime import datetime
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "security-service",
"version": "2.0.0",
"components": {
"database": {"status": db_status},
"zabbix": zabbix_status
}
}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail=f"Service unhealthy: {str(e)}")
@app.get("/ready")
async def readiness_check():
"""就绪检查端点"""
try:
# 检查关键依赖是否就绪
if not db.db:
return {
"status": "not_ready",
"timestamp": datetime.now().isoformat(),
"reason": "database_not_connected"
}
return {
"status": "ready",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "not_ready",
"timestamp": datetime.now().isoformat(),
"reason": str(e)
}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger = logging.getLogger(__name__)
# 连接数据库
db.connect()
# 初始化Zabbix服务连接检查
try:
from app.services.zabbix_service import ZabbixService
zabbix_service = ZabbixService()
sync_status = zabbix_service.get_sync_status()
if sync_status["collector_initialized"]:
logger.info("✅ Zabbix服务初始化成功")
else:
logger.warning("⚠️ Zabbix服务初始化失败,请检查Zabbix配置")
except Exception as e:
logger.error(f"❌ Zabbix服务连接检查失败: {e}")
yield
# Shutdown
db.close()
......@@ -17,6 +114,3 @@ app = FastAPI(title=settings.PROJECT_NAME, lifespan=lifespan)
# 注册路由
app.include_router(security_router, prefix=f"{settings.API_V1_STR}/security", tags=["Security"])
@app.get("/")
def health_check():
return {"status": "ok", "service": "security-service"}
......@@ -6,28 +6,75 @@ from pydantic import ValidationError
from app.schemas.payloads import *
from app.core.config import settings
from app.core.database import db
from app.services.zabbix_service import ZabbixService
import logging
logger = logging.getLogger(__name__)
class SecurityService:
def __init__(self, zabbix_service: ZabbixService):
"""
初始化安全服务
:param zabbix_service: Zabbix数据服务
"""
self.zabbix_service = zabbix_service
async def analyze_risks(self, devices: List[DeviceInfo] = None) -> SecurityAnalysisResponse:
"""
任务:风险分析
任务:风险分析 - 使用真实Zabbix数据
"""
if not devices:
# 从Zabbix自动采集设备数据
try:
logger.info("未提供设备数据,从Zabbix自动采集...")
zabbix_data = await self.zabbix_service.collect_device_data()
devices_data = zabbix_data.get("devices", [])
# 转换为DeviceInfo对象
devices = []
for device_data in devices_data:
device = DeviceInfo(
id=device_data.get("id", ""),
name=device_data.get("name", ""),
type=device_data.get("type", "unknown"),
status=device_data.get("status", "unknown"),
logs=device_data.get("logs", [])
)
devices.append(device)
logger.info(f"从Zabbix采集到 {len(devices)} 台设备")
except Exception as e:
logger.error(f"从Zabbix采集设备数据失败: {e}")
# 使用空设备列表继续,让LLM处理
devices = []
if not devices:
# 如果依然没有设备数据,创建空的设备列表
devices = []
# 1. 准备数据,不再拼接 Prompt,只序列化数据
device_data = [d.dict() for d in devices] # 假设 Pydantic 模型有 .dict(),或者手动转 dict
# 1. 准备数据,序列化设备数据
device_data = [d.model_dump() for d in devices]
# 2. 构造 Dify 所需的变量 inputs
inputs = {
"task_type": "analysis", # 告诉 Dify 执行哪个任务分支
"context_data": json.dumps(device_data, ensure_ascii=False) # 将复杂数据转为字符串传递
"context_data": json.dumps(device_data, ensure_ascii=False) # 将复杂数据转为字符串传递
}
return await self._call_llm(inputs, SecurityAnalysisResponse)
# 3. 调用LLM进行分析
result = await self._call_llm(inputs, SecurityAnalysisResponse)
# 4. 异步存储结果到 MongoDB
try:
if db.db is not None:
log_entry = result.model_dump()
log_entry["created_at"] = datetime.now(datetime.timezone.utc)
log_entry["device_count"] = len(devices)
await db.db.security_analysis_logs.insert_one(log_entry)
logger.info("安全分析结果已保存到MongoDB")
except Exception as e:
logger.error(f"保存分析结果到MongoDB失败: {e}")
return result
async def get_attack_advice(self, attack_type: str, target: str, logs: str) -> AttackAdviceResponse:
"""
......@@ -49,30 +96,159 @@ class SecurityService:
async def generate_report(self) -> SecurityReportResponse:
"""
任务:生成日报
任务:生成日报 - 使用真实数据
"""
# 注意:此处应从真实数据源获取状态,当前暂无数据源连接
data = {
"date": datetime.now().strftime('%Y-%m-%d'),
"device_status": "暂无数据 (需接入数据源)",
"intercept_count": 0
}
try:
# 从Zabbix获取实时数据
logger.info("从Zabbix采集数据生成安全报告...")
# 采集设备数据
device_data = await self.zabbix_service.collect_device_data()
devices = device_data.get("devices", [])
# 统计设备状态
total_devices = len(devices)
up_devices = sum(1 for d in devices if d.get("status") == "up")
down_devices = sum(1 for d in devices if d.get("status") == "down")
problem_devices = sum(1 for d in devices if d.get("logs"))
# 构建报告数据
report_data = {
"date": datetime.now().strftime('%Y-%m-%d'),
"device_status": {
"total_devices": total_devices,
"up_devices": up_devices,
"down_devices": down_devices,
"problem_devices": problem_devices
},
"incident_summary": {
"total_events": sum(len(d.get("logs", [])) for d in devices),
"critical_events": 0, # 可以从Zabbix触发器优先级统计
"high_events": 0,
"medium_events": 0,
"low_events": 0,
"resolved_events": 0,
"unresolved_events": problem_devices
},
"top_issues": self._extract_top_issues(devices),
"real_time_data": True # 标识使用真实数据
}
logger.info(f"成功采集到 {total_devices} 台设备数据,其中 {problem_devices} 台有问题")
except Exception as e:
logger.error(f"从Zabbix采集数据失败,使用备用数据: {e}")
# 备用数据(当Zabbix不可用时)
report_data = {
"date": datetime.now().strftime('%Y-%m-%d'),
"device_status": {
"total_devices": 0,
"up_devices": 0,
"down_devices": 0,
"problem_devices": 0,
"status": "zabbix_unavailable"
},
"incident_summary": {
"total_events": 0,
"critical_events": 0,
"high_events": 0,
"medium_events": 0,
"low_events": 0,
"resolved_events": 0,
"unresolved_events": 0
},
"top_issues": ["Zabbix服务不可用,无法获取实时数据"],
"real_time_data": False # 标识使用备用数据
}
inputs = {
"task_type": "report",
"context_data": json.dumps(data, ensure_ascii=False)
"context_data": json.dumps(report_data, ensure_ascii=False)
}
return await self._call_llm(inputs, SecurityReportResponse)
result = await self._call_llm(inputs, SecurityReportResponse)
# 异步存储结果到 MongoDB
try:
if db.db is not None:
log_entry = result.model_dump()
log_entry["created_at"] = datetime.now(datetime.timezone.utc)
log_entry["report_date"] = report_data["date"]
log_entry["real_time_data"] = report_data.get("real_time_data", False)
await db.db.security_report_logs.insert_one(log_entry)
logger.info("安全报告结果已保存到MongoDB")
except Exception as e:
logger.error(f"保存报告结果到MongoDB失败: {e}")
return result
async def monitor_risks(self) -> RiskMonitorResponse:
"""
[MOCK] 实时风险监控
实时风险监控 - 使用真实数据
"""
try:
logger.info("从Zabbix采集监控数据...")
# 采集设备数据
device_data = await self.zabbix_service.collect_device_data()
devices = device_data.get("devices", [])
# 分析设备状态
detected_vulnerabilities = []
compliance_risks = []
for device in devices:
device_name = device.get("name", "未知设备")
device_type = device.get("type", "unknown")
logs = device.get("logs", [])
# 基于设备类型和日志分析潜在风险
if device_type == "switch":
# 交换机常见风险
if any("down" in log.lower() for log in logs):
compliance_risks.append(f"交换机 {device_name} 离线 - 网络中断风险")
if any("link down" in log.lower() for log in logs):
detected_vulnerabilities.append(f"网络接口异常 - {device_name}")
elif device_type == "firewall":
# 防火墙常见风险
if any("packet loss" in log.lower() for log in logs):
detected_vulnerabilities.append(f"防火墙 {device_name} 包丢失 - 性能问题")
if any("connection" in log.lower() and "failed" in log.lower() for log in logs):
compliance_risks.append(f"防火墙 {device_name} 连接失败 - 安全策略检查")
elif device_type == "server":
# 服务器常见风险
if any("cpu" in log.lower() and "high" in log.lower() for log in logs):
detected_vulnerabilities.append(f"服务器 {device_name} CPU 高负载 - 性能风险")
if any("memory" in log.lower() and "usage" in log.lower() for log in logs):
compliance_risks.append(f"服务器 {device_name} 内存使用异常 - 资源优化")
# 通用风险检测
for log in logs:
if any(priority in log for priority in ["Priority: 5", "Priority: 4"]):
detected_vulnerabilities.append(f"高优先级告警 - {device_name}: {log[:50]}...")
# 如果没有检测到具体风险,提供通用风险评估
if not detected_vulnerabilities and not compliance_risks:
detected_vulnerabilities = ["系统运行正常,未检测到明显安全漏洞"]
compliance_risks = ["系统符合基本安全要求,建议定期审计"]
ai_assessment = f"系统安全状态: {self._assess_security_risk_level(devices)}"
logger.info(f"监控分析完成,发现 {len(detected_vulnerabilities)} 个漏洞和 {len(compliance_risks)} 个合规风险")
except Exception as e:
logger.error(f"从Zabbix采集监控数据失败: {e}")
# 备用监控数据
detected_vulnerabilities = ["监控系统不可用,无法获取实时风险数据"]
compliance_risks = ["系统状态未知,请检查Zabbix连接"]
ai_assessment = "监控系统服务异常,无法进行有效风险评估"
return RiskMonitorResponse(
detected_vulnerabilities=["CVE-2024-0001 (Critical)", "Weak SSH Config"],
compliance_risks=["Password policy outdated", "Unencrypted backup found"],
ai_assessment="System security posture is stable but requires attention on recent CVEs."
detected_vulnerabilities=detected_vulnerabilities,
compliance_risks=compliance_risks,
ai_assessment=ai_assessment
)
async def get_analysis_history(self, start_date: datetime = None, end_date: datetime = None, limit: int = 20) -> HistoryQueryResponse:
......@@ -202,6 +378,49 @@ class SecurityService:
return self._build_default_response(model_cls)
def _extract_top_issues(self, devices: List[Dict]) -> List[str]:
"""
从设备日志中提取主要问题
"""
issues = []
for device in devices:
device_name = device.get("name", "未知设备")
logs = device.get("logs", [])
# 分析日志中的关键问题
for log in logs:
if any(keyword in log.lower() for keyword in ["critical", "down", "failed", "error", "high priority"]):
issues.append(f"{device_name}: {log[:80]}...")
break # 每个设备只取一个问题
# 如果没有问题,返回正常状态
if not issues:
issues = ["系统运行正常,未发现明显问题"]
return issues[:5] # 只返回前5个主要问题
def _assess_security_risk_level(self, devices: List[Dict]) -> str:
"""
评估系统整体安全风险等级
"""
total_devices = len(devices)
problem_devices = sum(1 for d in devices if d.get("logs"))
if total_devices == 0:
return "未知 - 无设备数据"
problem_ratio = problem_devices / total_devices
if problem_ratio >= 0.3:
return "高风险 - 多个设备出现异常"
elif problem_ratio >= 0.1:
return "中等风险 - 部分设备存在异常"
elif problem_ratio > 0:
return "低风险 - 少量设备出现异常"
else:
return "安全 - 所有设备运行正常"
def _clean_json_string(self, text: str) -> str:
"""清洗 Markdown 代码块标记和思维链标签"""
import re
......
import asyncio
import requests
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from app.core.config import settings
logger = logging.getLogger(__name__)
class ZabbixDataCollector:
def __init__(self, zabbix_url: str, username: str, password: str):
"""
初始化 Zabbix 数据采集器
:param zabbix_url: Zabbix 服务器基础 URL,例如 "http://192.168.20.199"
:param username: 用户名
:param password: 密码
"""
self.zabbix_url = zabbix_url.rstrip('/')
self.username = username
self.password = password
self.auth_token = None
self.login()
def _get_api_url(self):
"""返回完整的 API 地址"""
return f"{self.zabbix_url}/zabbix/api_jsonrpc.php"
def login(self):
"""智能登录:自动尝试新旧参数格式"""
# 尝试顺序:先新版 (username),再旧版 (user)
payloads = [
{
"jsonrpc": "2.0",
"method": "user.login",
"params": {"username": self.username, "password": self.password},
"id": 1
},
{
"jsonrpc": "2.0",
"method": "user.login",
"params": {"user": self.username, "password": self.password},
"id": 1
}
]
for i, payload in enumerate(payloads):
try:
logger.info(f"尝试登录方式 {'新版 (username)' if i == 0 else '旧版 (user)'}...")
response = requests.post(
self._get_api_url(),
json=payload,
timeout=10,
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
logger.warning(f"HTTP {response.status_code}: {response.reason}")
continue
data = response.json()
if "result" in data:
self.auth_token = data["result"]
version = "Zabbix 5.4+" if i == 0 else "Zabbix 5.2-"
logger.info(f"登录成功!检测到 {version} 格式。")
return
elif "error" in data:
err = data["error"]
msg = f"[{err.get('code')}] {err.get('message')} - {err.get('data', '')}"
logger.warning(f"登录失败: {msg}")
if "unexpected parameter" in msg and ("username" in msg or "user" in msg):
continue # 尝试下一个
else:
raise Exception(msg)
else:
logger.warning("未知响应格式: {data}")
continue
except requests.exceptions.RequestException as e:
logger.error(f"网络错误: {e}")
continue
except json.JSONDecodeError:
logger.warning("非 JSON 响应: {response.text[:200]}")
continue
raise Exception("所有登录方式均失败!请检查:\n1. Zabbix 地址是否正确\n2. 用户名/密码是否正确\n3. Zabbix 是否运行\n4. 是否允许 API 访问")
def _call_api(self, method: str, params: dict):
"""通用 API 调用"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"auth": self.auth_token,
"id": 1
}
try:
response = requests.post(
self._get_api_url(),
json=payload,
timeout=15,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
data = response.json()
if "error" in data:
err = data["error"]
raise Exception(f"[{err.get('code')}] {err.get('message')} - {err.get('data', '')}")
return data["result"]
except Exception as e:
raise Exception(f"API 调用失败 ({method}): {e}")
def get_hosts(self):
"""获取主机列表"""
return self._call_api("host.get", {
"output": ["hostid", "name", "status"],
"selectTags": ["tag", "value"]
})
def get_triggers(self):
"""获取活动触发器"""
return self._call_api("trigger.get", {
"output": ["triggerid", "description", "priority", "status"],
"selectHosts": ["hostid", "name"],
"filter": {"value": 1},
"sortfield": "priority",
"sortorder": "DESC"
})
def get_events(self, time_from: Optional[int] = None, time_till: Optional[int] = None, limit: int = 100):
"""获取事件"""
params = {
"output": ["eventid", "clock", "name", "severity"],
"selectHosts": ["hostid", "name"],
"sortfield": "clock",
"sortorder": "DESC",
"limit": limit
}
if time_from is not None:
params["time_from"] = time_from
if time_till is not None:
params["time_till"] = time_till
return self._call_api("event.get", params)
def get_cpu_data(self):
"""获取CPU和硬件数据"""
logger.info("获取CPU和硬件数据...")
# 获取主机
hosts = self.get_hosts()
logger.info(f"获取到 {len(hosts)} 台主机")
# 获取触发器
triggers = self.get_triggers()
logger.info(f"获取到 {len(triggers)} 个活动触发器")
# 获取最近24小时事件
time_from = int((datetime.now() - timedelta(hours=24)).timestamp())
events = self.get_events(time_from=time_from, limit=200)
logger.info(f"获取到 {len(events)} 条事件")
# 构建CPU数据结构
cpu_data = []
for host in hosts:
host_data = {
"id": host["hostid"],
"name": host["name"],
"type": self._determine_device_type(host),
"status": "up" if host["status"] == "0" else "down",
"cpu_usage": self._get_mock_cpu_usage(), # 实际项目中应该从items获取
"memory_usage": self._get_mock_memory_usage(),
"disk_usage": self._get_mock_disk_usage(),
"logs": []
}
# 关联触发器日志
for t in triggers:
if any(h["hostid"] == host["hostid"] for h in t.get("hosts", [])):
host_data["logs"].append(f"{t['description']} - Priority: {t['priority']}")
# 关联事件日志
for e in events:
if any(h["hostid"] == host["hostid"] for h in e.get("hosts", [])):
ts = datetime.fromtimestamp(int(e["clock"])).strftime("%Y-%m-%d %H:%M:%S")
host_data["logs"].append(f"{ts} - {e['name']} - Severity: {e['severity']}")
cpu_data.append(host_data)
return {"hosts": cpu_data}
def get_network_data(self):
"""获取网络接口数据"""
logger.info("获取网络接口数据...")
# 获取主机
hosts = self.get_hosts()
logger.info(f"获取到 {len(hosts)} 台主机")
# 获取触发器
triggers = self.get_triggers()
logger.info(f"获取到 {len(triggers)} 个活动触发器")
# 获取最近24小时事件
time_from = int((datetime.now() - timedelta(hours=24)).timestamp())
events = self.get_events(time_from=time_from, limit=200)
logger.info(f"获取到 {len(events)} 条事件")
# 构建网络数据结构
network_data = []
for host in hosts:
host_data = {
"id": host["hostid"],
"name": host["name"],
"type": self._determine_device_type(host),
"status": "up" if host["status"] == "0" else "down",
"interfaces": self._get_mock_network_interfaces(),
"logs": []
}
# 关联触发器日志
for t in triggers:
if any(h["hostid"] == host["hostid"] for h in t.get("hosts", [])):
host_data["logs"].append(f"{t['description']} - Priority: {t['priority']}")
# 关联事件日志
for e in events:
if any(h["hostid"] == host["hostid"] for h in e.get("hosts", [])):
ts = datetime.fromtimestamp(int(e["clock"])).strftime("%Y-%m-%d %H:%M:%S")
host_data["logs"].append(f"{ts} - {e['name']} - Severity: {e['severity']}")
network_data.append(host_data)
return {"hosts": network_data}
def get_security_data_for_analysis(self):
"""获取安全分析数据"""
logger.info("获取安全分析数据...")
# 获取主机
hosts = self.get_hosts()
logger.info(f"获取到 {len(hosts)} 台主机")
# 获取活动触发器
triggers = self.get_triggers()
logger.info(f"获取到 {len(triggers)} 个活动触发器")
# 获取最近24小时事件
time_from = int((datetime.now() - timedelta(hours=24)).timestamp())
events = self.get_events(time_from=time_from, limit=200)
logger.info(f"获取到 {len(events)} 条事件")
devices = []
for host in hosts:
device = {
"id": host["hostid"],
"name": host["name"],
"type": self._determine_device_type(host),
"status": "up" if host["status"] == "0" else "down",
"logs": []
}
# 关联触发器
for t in triggers:
if any(h["hostid"] == host["hostid"] for h in t.get("hosts", [])):
device["logs"].append(f"{t['description']} - Priority: {t['priority']}")
# 关联事件
for e in events:
if any(h["hostid"] == host["hostid"] for h in e.get("hosts", [])):
ts = datetime.fromtimestamp(int(e["clock"])).strftime("%Y-%m-%d %H:%M:%S")
device["logs"].append(f"{ts} - {e['name']} - Severity: {e['severity']}")
devices.append(device)
return {"devices": devices}
def _determine_device_type(self, host):
"""智能识别设备类型"""
tags = host.get("tags", [])
for tag in tags:
if tag["tag"] == "device_type":
return tag["value"]
name = host["name"].lower()
if any(kw in name for kw in ["sw", "switch"]):
return "switch"
elif any(kw in name for kw in ["fw", "firewall"]):
return "firewall"
elif any(kw in name for kw in ["server", "web", "db", "srv"]):
return "server"
else:
return "unknown"
def _get_mock_cpu_usage(self):
"""模拟CPU使用率数据"""
return {
"usage": 45.2, # 模拟使用率
"cores": 4, # 模拟核心数
"load_average": [1.2, 1.1, 0.8] # 模拟负载
}
def _get_mock_memory_usage(self):
"""模拟内存使用数据"""
return {
"total": 16384, # MB
"used": 8192, # MB
"free": 8192, # MB
"usage_percent": 50.0
}
def _get_mock_disk_usage(self):
"""模拟磁盘使用数据"""
return [
{
"mount": "/",
"total": 500000, # MB
"used": 250000, # MB
"free": 250000, # MB
"usage_percent": 50.0
},
{
"mount": "/var",
"total": 100000, # MB
"used": 60000, # MB
"free": 40000, # MB
"usage_percent": 60.0
}
]
def _get_mock_network_interfaces(self):
"""模拟网络接口数据"""
return [
{
"name": "eth0",
"ip_address": "192.168.1.100",
"mac_address": "00:0C:29:12:34:56",
"status": "up",
"speed": "1000Mbps",
"rx_bytes": 1234567890,
"tx_bytes": 987654321,
"errors": {"rx": 0, "tx": 0}
},
{
"name": "eth1",
"ip_address": "192.168.2.100",
"mac_address": "00:0C:29:12:34:57",
"status": "up",
"speed": "1000Mbps",
"rx_bytes": 2345678901,
"tx_bytes": 1876543210,
"errors": {"rx": 1, "tx": 0}
}
]
class ZabbixService:
def __init__(self):
"""初始化Zabbix服务"""
self.collector = None
self.last_sync_time = None
self._initialize_collector()
def _initialize_collector(self):
"""初始化数据采集器"""
try:
self.collector = ZabbixDataCollector(
zabbix_url=settings.ZABBIX_URL,
username=settings.ZABBIX_USERNAME,
password=settings.ZABBIX_PASSWORD
)
logger.info("Zabbix服务初始化成功")
except Exception as e:
logger.error(f"Zabbix服务初始化失败: {e}")
self.collector = None
async def collect_device_data(self):
"""采集设备数据"""
if not self.collector:
raise Exception("Zabbix collector未初始化,请检查Zabbix配置")
return self.collector.get_security_data_for_analysis()
async def collect_cpu_data(self):
"""采集CPU和硬件数据"""
if not self.collector:
raise Exception("Zabbix collector未初始化,请检查Zabbix配置")
return self.collector.get_cpu_data()
async def collect_network_data(self):
"""采集网络接口数据"""
if not self.collector:
raise Exception("Zabbix collector未初始化,请检查Zabbix配置")
return self.collector.get_network_data()
async def sync_data(self):
"""同步数据"""
try:
logger.info("开始同步Zabbix数据...")
# 并发采集不同类型的数据
device_task = self.collect_device_data()
cpu_task = self.collect_cpu_data()
network_task = self.collect_network_data()
# 等待所有任务完成
device_data, cpu_data, network_data = await asyncio.gather(
device_task, cpu_task, network_task
)
self.last_sync_time = datetime.now()
logger.info("Zabbix数据同步完成")
return {
"devices": len(device_data.get("devices", [])),
"hosts_cpu": len(cpu_data.get("hosts", [])),
"hosts_network": len(network_data.get("hosts", [])),
"sync_time": self.last_sync_time.isoformat()
}
except Exception as e:
logger.error(f"Zabbix数据同步失败: {e}")
raise Exception(f"数据同步失败: {e}")
def get_sync_status(self):
"""获取同步状态"""
return {
"last_sync_time": self.last_sync_time.isoformat() if self.last_sync_time else None,
"collector_initialized": self.collector is not None
}
\ No newline at end of file
......@@ -8,3 +8,5 @@ python-dotenv==1.0.1
feedparser>=6.0.10
motor==3.3.2
pymongo<4.7
requests>=2.31.0
@echo off
REM Security Service 启动脚本 (Windows)
echo ============================================================
echo Security Service - Zabbix集成版本
echo ============================================================
echo.
REM 检查Python是否安装
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo ❌ Python未安装或未添加到PATH
pause
exit /b 1
)
REM 检查虚拟环境
if not exist "venv" (
echo 📦 创建虚拟环境...
python -m venv venv
)
REM 激活虚拟环境
call venv\Scripts\activate.bat
REM 安装依赖
echo 📥 安装依赖...
pip install -r requirements.txt -q
REM 检查.env文件
if not exist ".env" (
echo ⚠️ .env文件不存在,复制.env.example...
copy .env.example .env
echo ❗ 请编辑.env文件,配置Zabbix服务器信息
echo ❗ 配置完成后重新运行此脚本
pause
exit /b 1
)
REM 启动服务
echo.
echo 🚀 启动Security Service...
echo 📍 服务地址: http://localhost:8002
echo 📚 API文档: http://localhost:8002/docs
echo.
echo 按 Ctrl+C 停止服务
echo.
python -m uvicorn app.main:app --host 0.0.0.0 --port 8002 --reload
pause
\ No newline at end of file
#!/bin/bash
# Security Service 启动脚本 (Linux/MacOS)
echo "============================================================"
echo " Security Service - Zabbix集成版本"
echo "============================================================"
echo
# 检查Python是否安装
if ! command -v python3 &> /dev/null; then
echo "❌ Python3未安装"
exit 1
fi
# 检查虚拟环境
if [ ! -d "venv" ]; then
echo "📦 创建虚拟环境..."
python3 -m venv venv
fi
# 激活虚拟环境
echo "🔄 激活虚拟环境..."
source venv/bin/activate
# 安装依赖
echo "📥 安装依赖..."
pip install -r requirements.txt -q
# 检查.env文件
if [ ! -f ".env" ]; then
echo "⚠️ .env文件不存在,复制.env.example..."
cp .env.example .env
echo "❗ 请编辑.env文件,配置Zabbix服务器信息"
echo "❗ 配置完成后重新运行此脚本"
exit 1
fi
# 启动服务
echo
echo "🚀 启动Security Service..."
echo "📍 服务地址: http://localhost:8002"
echo "📚 API文档: http://localhost:8002/docs"
echo
echo "按 Ctrl+C 停止服务"
echo
python3 -m uvicorn app.main:app --host 0.0.0.0 --port 8002 --reload
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Security Service 重构测试脚本
测试新的Zabbix集成功能
"""
import asyncio
import sys
import os
import json
from datetime import datetime
# 添加app目录到Python路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'app'))
async def test_zabbix_service():
"""测试Zabbix服务"""
print("="*70)
print("测试 Zabbix Service")
print("="*70)
try:
from services.zabbix_service import ZabbixService
# 创建Zabbix服务实例
zabbix_service = ZabbixService()
# 测试获取同步状态
print("1. 检查Zabbix服务状态...")
status = zabbix_service.get_sync_status()
print(f" 初始化状态: {'✅ 成功' if status['collector_initialized'] else '❌ 失败'}")
print(f" 最后同步时间: {status['last_sync_time']}")
# 测试同步数据
print("\n2. 测试Zabbix数据同步...")
sync_result = await zabbix_service.sync_data()
print(f" 同步状态: ✅ 成功")
print(f" 同步结果: {json.dumps(sync_result, indent=2, ensure_ascii=False)}")
# 测试获取设备数据
print("\n3. 测试获取设备数据...")
device_data = await zabbix_service.collect_device_data()
devices = device_data.get("devices", [])
print(f" 设备数量: {len(devices)}")
# 显示前3个设备
for i, device in enumerate(devices[:3]):
print(f" 设备 {i+1}: {device['name']} ({device['type']}) - {device['status']}")
print(f" 日志数量: {len(device['logs'])}")
return True
except Exception as e:
print(f"❌ Zabbix Service测试失败: {e}")
return False
async def test_security_service():
"""测试安全服务"""
print("\n" + "="*70)
print("测试 Security Service")
print("="*70)
try:
from services.zabbix_service import ZabbixService
from services.analysis import SecurityService
# 创建服务实例
zabbix_service = ZabbixService()
security_service = SecurityService(zabbix_service=zabbix_service)
# 测试风险分析
print("1. 测试风险分析功能...")
analysis_result = await security_service.analyze_risks()
print(f" 分析状态: ✅ 成功")
print(f" 风险级别: {analysis_result.risk_level}")
print(f" 漏洞数量: {len(analysis_result.vulnerabilities)}")
print(f" 建议数量: {len(analysis_result.suggestions)}")
# 测试监控功能
print("\n2. 测试风险监控功能...")
monitor_result = await security_service.monitor_risks()
print(f" 监控状态: ✅ 成功")
print(f" 检测漏洞: {len(monitor_result.detected_vulnerabilities)}")
print(f" 合规风险: {len(monitor_result.compliance_risks)}")
# 测试报告生成
print("\n3. 测试报告生成功能...")
report_result = await security_service.generate_report()
print(f" 报告状态: ✅ 成功")
print(f" 报告日期: {report_result.date}")
print(f" 整体状态: {report_result.overall_status}")
# 测试攻击建议
print("\n4. 测试攻击建议功能...")
advice_result = await security_service.get_attack_advice(
attack_type="Port Scan",
target="Web Server",
logs="Port scan detected from external IP"
)
print(f" 建议状态: ✅ 成功")
print(f" 即时行动: {len(advice_result.immediate_actions)}")
print(f" 分析结果: {advice_result.analysis[:100]}...")
return True
except Exception as e:
print(f"❌ Security Service测试失败: {e}")
return False
async def main():
"""主测试函数"""
print("Security Service 重构验证测试")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("此脚本测试新的Zabbix集成功能...")
# 测试结果
results = {
"zabbix_service": False,
"security_service": False,
"overall": False
}
# 测试Zabbix服务
results["zabbix_service"] = await test_zabbix_service()
# 测试安全服务
results["security_service"] = await test_security_service()
# 总体结果
results["overall"] = results["zabbix_service"] and results["security_service"]
# 输出测试总结
print("\n" + "="*70)
print("测试总结")
print("="*70)
print(f"Zabbix Service: {'✅ 通过' if results['zabbix_service'] else '❌ 失败'}")
print(f"Security Service: {'✅ 通过' if results['security_service'] else '❌ 失败'}")
print(f"总体结果: {'✅ 全部通过' if results['overall'] else '❌ 有失败项'}")
if results['overall']:
print("\n🎉 重构成功!所有功能正常工作。")
print("现在可以使用以下API端点:")
print(" POST /api/v1/security/zabbix/sync - 手动同步Zabbix数据")
print(" GET /api/v1/security/zabbix/status - 获取Zabbix服务状态")
print(" POST /api/v1/security/zabbix/devices - 获取设备列表")
print(" POST /api/v1/security/analysis - 安全分析")
print(" GET /api/v1/security/monitor - 风险监控")
print(" GET /api/v1/security/report - 生成报告")
else:
print("\n⚠️ 测试失败,请检查:")
print(" 1. Zabbix服务器配置")
print(" 2. 环境变量设置")
print(" 3. 网络连接")
print(" 4. 依赖安装")
return results["overall"]
if __name__ == "__main__":
# 运行测试
success = asyncio.run(main())
sys.exit(0 if success else 1)
\ No newline at end of file
......@@ -252,9 +252,16 @@ def generate_env_file():
'DIFY_API_KEY': 'your_dify_api_key_here',
'DIFY_RESPONSE_MODE': 'streaming',
'DIFY_MESSAGE_ENDPOINT': 'chat-messages',
'MONGODB_URL': 'mongodb://mongo:27017',
'MONGODB_DB_NAME': 'security_service_db',
'REDIS_HOST': 'redis',
'MONGODB_URL': 'mongodb://mongo:27017',
'MONGODB_DB_NAME': 'security_service_db',
'ZABBIX_URL': 'http://localhost/zabbix/api_jsonrpc.php',
'ZABBIX_USERNAME': 'Admin',
'ZABBIX_PASSWORD': generate_db_password(20),
'ZABBIX_SYNC_INTERVAL': '3600',
'ZABBIX_AUTO_SYNC': 'true',
'LOG_LEVEL': 'INFO',
'LOG_FORMAT': 'json',
'REDIS_HOST': 'redis',
'REDIS_PORT': '6379',
'REDIS_PASSWORD': generate_db_password(16),
'REDIS_DB': '0',
......@@ -336,6 +343,21 @@ MONGODB_URL={values['MONGODB_URL']}
DB_NAME={values['DB_NAME']}
MONGODB_DB_NAME={values['MONGODB_DB_NAME']}
# ============================================================
# 🖥️ Zabbix 监控系统配置
# ============================================================
ZABBIX_URL={values['ZABBIX_URL']}
ZABBIX_USERNAME={values['ZABBIX_USERNAME']}
ZABBIX_PASSWORD={values['ZABBIX_PASSWORD']}
ZABBIX_SYNC_INTERVAL={values['ZABBIX_SYNC_INTERVAL']}
ZABBIX_AUTO_SYNC={values['ZABBIX_AUTO_SYNC']}
# ============================================================
# 📝 日志配置
# ============================================================
LOG_LEVEL={values['LOG_LEVEL']}
LOG_FORMAT={values['LOG_FORMAT']}
# ============================================================
# 🚀 Redis 配置
# ============================================================
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment