🛡️ AutoSOC Agent
项目简介
AutoSOC Agent 是一个基于 Agentic AI 技术的自动化安全运营中心系统,专注于网络流量分析和安全威胁检测。它采用先进的微内核 + 插件化架构,结合大语言模型的推理能力,实现了对网络流量的智能分析和安全威胁的自动检测。
核心功能
- 无感摄入:自动监听指定目录,当流量包放入后自动完成预处理
- 智能分析:利用大语言模型分析网络流量,识别潜在的安全威胁
- 插件化架构:支持自定义插件开发,轻松扩展新的安全分析功能
- 实时预警:发现异常流量时及时发出预警
- 流量可视化:提供直观的流量分析结果可视化
系统架构
AutoSOC Agent 采用 微内核 + 插件化架构,严格遵循开闭原则,确保新功能的扩展不会破坏现有核心。
架构层次
- 内核层 (Core):系统的"不动产",负责生命周期管理、依赖注入、数据库连接池等
- 基建层 (Infrastructure):通用工具组件,如 Tshark 封装、目录监听、预处理引擎等
- 领域层 (Domain):系统的数据契约,定义统一的数据模型
- 插件层 (Features/Plugins):独立的功能包,如流量分析、SQL 注入检测等
数据流
- 数据摄入:通过 Watchdog 监听目录,发现新的流量包
- 预处理:Preprocessor 自动完成 PCAP 切片、Zeek 日志提取等
- 情报上板:将提取的情报注入到 SQLite 黑板数据库
- AI 分析:大语言模型分析结构化数据,识别安全威胁
- 结果输出:生成分析报告并发出预警
目录结构
src/app/
├── core/ # 🟢 [内核层] 系统的"不动产"
│ ├── server.py # FastMCP 启动入口 & 环境变量注入
│ ├── loader.py # 动态加载器:扫描 features 并注册
│ ├── context.py # 依赖注入容器 (DI Container)
│ └── database.py # SQLite 异步黑板与状态机
│
├── infrastructure/ # 🟡 [基建层] 屏蔽底层工具差异
│ ├── base_runner.py # 包含熔断与超时的执行引擎基类
│ ├── tshark_runner.py # Tshark 安全执行器 (防注入)
│ ├── watchdog.py # 目录监听黑洞服务
│ └── preprocessor/ # 流量预处理引擎
│
├── domain/ # 🟣 [领域层] 数据契约
│ ├── traffic.py # 流量相关数据模型
│ ├── workflow.py # 工作流相关数据模型
│ └── interaction.py # 交互相关数据模型
│
└── features/ # 🔴 [插件层] 业务逻辑游乐场
├── traffic_inspector/ # 流量分析插件
├── decoder/ # 数据解码插件
└── admin_tools/ # 管理工具插件
安装指南
前置依赖
- Python 3.10+:系统运行环境
- Wireshark:提供 tshark 和 editcap 工具
- Zeek:可选,用于提取结构化日志
- SQLite:轻量级数据库,用于存储分析结果
安装步骤
克隆项目
git clone <repository-url> cd traffic_analysis创建虚拟环境
python -m venv venv # Windows venv\Scripts\activate # Linux/Mac source venv/bin/activate安装依赖
pip install -r requirements.txt配置环境变量(可选)
# Windows set APP_WORKSPACE=./workspace set TSHARK_PATH=C:\Program Files\Wireshark\tshark.exe # Linux/Mac export APP_WORKSPACE=./workspace export TSHARK_PATH=/usr/bin/tshark
使用方法
启动系统
python run.py
系统启动后,会自动监听 workspace/pcap_inbox/ 目录,当流量包放入该目录时,系统会自动开始分析。
核心工具使用
流量分析
get_pcap_summary:获取流量包的全局统计信息inspect_packets:使用 Wireshark 语法查询特定数据包
数据解码
decode_payload:解码各种编码格式的数据
管理工具
run_garbage_collection:运行垃圾回收,清理过期数据
插件开发
AutoSOC Agent 支持自定义插件开发,只需按照以下步骤即可:
创建插件目录
mkdir -p src/app/features/my_plugin创建插件初始化文件
# src/app/features/my_plugin/__init__.py from mcp.server.fastmcp import FastMCP from app.core.context import AppContext def register(mcp: FastMCP, ctx: AppContext): """ 插件注册函数 """ # 注册工具 @mcp.tool(name="my_tool") async def my_tool(param: str) -> str: """ 我的工具 """ return f"处理结果: {param}" ctx.logger.info("plugin_registered", plugin="my_plugin")重启系统
系统会自动发现并加载新的插件。
依赖项
| 依赖 | 版本 | 用途 |
|---|---|---|
| Python | 3.10+ | 运行环境 |
| pydantic | 最新版 | 数据验证和序列化 |
| watchdog | 最新版 | 目录监听 |
| aiosqlite | 最新版 | 异步 SQLite 操作 |
| structlog | 最新版 | 结构化日志 |
| mcp | 最新版 | Agent 通信协议 |
| Wireshark | 3.0+ | 网络分析工具 |
| Zeek | 5.0+ | 流量日志提取 |
配置选项
| 配置项 | 环境变量 | 默认值 | 说明 |
|---|---|---|---|
| 工作目录 | APP_WORKSPACE | ./workspace | 系统工作目录 |
| Tshark 路径 | TSHARK_PATH | 自动查找 | Tshark 可执行文件路径 |
| 数据库名称 | DB_NAME | blackboard.db | 数据库文件名称 |
| 监控目录 | DROPZONE_FOLDER | pcap_inbox | 流量包收件箱目录 |
| 最大包数限制 | MAX_PACKET_LIMIT | 1000 | 单次查询最大返回包数 |
| 默认超时时间 | DEFAULT_TIMEOUT | 30 | 子进程默认超时时间(秒) |
贡献指南
我们欢迎社区贡献,包括但不限于:
- 修复 bug
- 添加新功能
- 改进文档
- 优化性能
贡献流程
- Fork 仓库
- 创建分支:
git checkout -b feature/your-feature - 提交更改:
git commit -m "Add your feature" - 推送分支:
git push origin feature/your-feature - 创建 Pull Request
许可证
本项目采用 MIT 许可证。详见 LICENSE 文件。
联系方式
- 项目主页:
- 问题反馈:/issues
- 邮箱:
致谢
- Wireshark:提供了强大的网络分析工具
- Zeek:提供了优秀的网络流量分析框架
- FastMCP:提供了高效的 Agent 通信协议
- 所有贡献者:感谢你们的辛勤工作和宝贵建议
AutoSOC Agent - 让网络安全分析更加智能、高效!