300.整体设计
AI Agent 的服务平台四层架构
- 用户入口层(API / UI)
- Agent 执行编排层(Core)
- 能力支撑层(LLM / Tool / Memory / RAG)
- 基础设施层(存储 / 调度 / 日志 / 权限)
层级 | 关键职责 | 技术核心 |
---|---|---|
用户入口层 | 请求接入、身份鉴权、上下文绑定 | FastAPI + Redis + WebSocket + JWT |
Agent 编排层 | Agent 任务流程、调用决策、执行控制 | LangChain / LangGraph + asyncio + Prompt 工程 |
能力支撑层 | 实际执行 LLM、工具、记忆检索、RAG 补充 | LLM API + 工具封装 + Redis Memory + 向量库 |
基础设施层 | 存储、调度、日志、权限、配置等平台支撑 | PostgreSQL + MinIO + Loki + Celery + Consul |
# 01.用户入口层(API / UI)
示例请求场景:用户在 Chat UI 输入一句话:「帮我总结一下这篇文章的重点」,并上传一段 PDF 内容
职责:提供统一入口,识别用户身份、会话状态,转发到 Agent 核心逻辑
技术栈:
React
+WebSocket
用于 UI 实时响应FastAPI
接收消息,统一路由处理Redis
存储会话状态、上下文信息JWT
认证 +OAuth
鉴权
处理流程
用户通过 Web 前端 UI(React + Tailwind)输入请求 + 上传文件
浏览器通过 WebSocket 向后端发送 JSON 请求
{ "session_id": "sess-xyz", "user_input": "帮我总结一下这篇文章的重点", "document": "xxx.pdf" }
1
2
3
4
5
FastAPI WebSocket 接收后
- 解析 JWT token,提取用户身份
- 确认 session_id 是否存在于 Redis,如果没有则初始化 session
- 上传的文档先缓存(本地或 MinIO),并生成预处理 ID
- 构造结构化调用参数转入下一层:
agent_executor.invoke(input_dict)
用户入口层的 API Handler 确实需要负责选择 Agent
用户主动选择
:比如 UI 上选择客服助手
、文档助手
、数据分析 Agent
系统智能分发
:根据请求内容自动匹配合适的 Agent(类似 Intent Router)- 实现思路,
调用一个小型 LLM,判断请求属于哪个 Agent
- 实现思路,
注册多个 Agent 实例(初始化时加载)
agent_map = { "doc_summary": initialize_agent(...), "image_ocr": initialize_agent(...), "qa_agent": initialize_agent(...), }
1
2
3
4
5
# 02.Agent 执行编排层
用户请求(上传 PDF)
└─> Agent 创建层(动态构造 AgentExecutor) ← 是否用 Tool,是否带 Memory?
└─> LangGraph 执行层(静态状态图流程)
├─> 状态 A:文档解析(工具)
├─> 状态 B:内容总结(调用 LLM)
└─> 状态 C:结果格式化并返回
2
3
4
5
6
职责:对接 LangChain/LangGraph,负责执行计划、调用 Memory、决定工具是否使用等
技术栈:
LangChain
AgentExecutor(ReAct / Function Agent)LangGraph
(用于多阶段 Agent 状态流转)Pydantic
构造请求结构asyncio
控制执行流程
# 1、websocket 设计
收到上层传入的
input_dict
(使用 http 请求)后端 → 立即生成唯一
task_id
并返回{ "input": "帮我总结一下这篇文章的重点", "file_id": "doc-123", "chat_history": [...], "user_id": "u-001" }
1
2
3
4
5
6结果获取阶段(WebSocket)
- 前端 → 使用
task_id
建立WebSocket连接 - 后端 → 通过同一WS连接持续推送:排队状态 → 执行进度 → 最终结果/错误信息
- 前端 → 使用
# 2、工具&缓存 是否使用
判断是否需要 Tool
- 显式判断:用户是否上传了 PDF/图片/表格?
- 意图识别:通过 LLM 判断意图,如“总结、翻译、提取数据” → 绑定相应 Tool
判断是否需要开启 Memory
检查请求是否带有
session_id
和user_id
读取 Redis,恢复为
ConversationBufferMemory
Memory 会自动记录过去对话,并注入到 Prompt 中
用户的历史对话存在 Redis 中,结构示意
Key: "chat_memory:{user_id}:{session_id}" Value: JSON list [ {"role": "human", "content": "你好"}, {"role": "ai", "content": "你好,我能帮你什么?"}, ... ]
1
2
3
4
5
6
# 3、动态创建Agent + 静态LangGraph
- 在企业实际部署中,每种典型业务流程(如文档解析、文本审核、图像摘要)会
预定义一个 LangGraph 状态机
- 已经固定好了功能与调用顺序(如解析 ➜ 总结 ➜ 输出)
- 内部调用的是一个 通用
Agent(通过参数/上下文灵活配置工具、Memory)
动态选项说明
动态项 实现方式 举例说明 Agent 内部的 Tools 根据请求类型动态装配 上传了 PDF 则附加 PDFParseTool Memory 根据 user_id / session_id 动态加载 同一用户上下文连续性支持 Prompt 输入内容 由每个节点动态生成 input_dict 比如 input = f"总结:{parsed_data}"
LangGraph 状态 可通过 if-else
/branching
实现分支解析成功才走总结,否则报错分支
# 1)动态构建 Agent(LangChain)
- 根据用户请求动态生成 AgentExecutor
- 是否需要工具、是否绑定 memory、LLM 模型选择
def create_agent(request):
tools = []
if request.file_type == "pdf":
tools.append(PDFParseTool)
memory = None
if request.session_id:
memory = get_memory_from_redis(...)
agent = initialize_agent(
tools=tools,
llm=ChatOpenAI(...),
agent=AgentType.OPENAI_FUNCTIONS,
memory=memory
)
return agent
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2)静态流程定义(LangGraph)
LangGraph 是一套有状态的链式执行框架,它把 agent 的调用变成 节点,节点之间通过条件流转
三个阶段节点定义
解析阶段
:调用工具,提取文档结构总结阶段
:把结构化数据交给 LLM 总结输出阶段
:生成最终格式结果
LangGraph 用
StateGraph
定义流程,类似状态机
def langgraph_app(agent):
def parse_pdf_tool(state):
return agent.invoke({"input": "请解析文档结构", "file": state["file"]})
def summarize_with_llm(state):
return agent.invoke({"input": f"总结内容:{state['parsed_data']}"})
def format_result(state):
return {"summary": state["summary"], "status": "done"}
with StateGraph() as graph:
graph.add_node("parse", parse_pdf_tool)
graph.add_node("summarize", summarize_with_llm)
graph.add_node("format", format_result)
graph.set_entry_point("parse")
graph.add_edge("parse", "summarize")
graph.add_edge("summarize", "format")
graph.set_finish_point("format")
return graph.compile()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 3)执行流程(调用 LangGraph)
- 前面已经构造好动态 Agent,现在将其用在 LangGraph 流程节点中
def handle_pdf_request(file, user_id):
agent = create_agent(file, user_id) # 动态构建 Agent(LangChain)
graph_app = langgraph_app(agent) # 构造 LangGraph 实例
result = graph_app.invoke({"file": file}) # 传入用户输入(或上一步中间结果)
return result
2
3
4
5
6
# 4、实时推送至前端
LLM 组件(如
ChatOpenAI
)内部在每个 token 输出时会- 自动调用
on_llm_new_token()
; - 工具执行后会触发
on_tool_end()
; - 整个 Chain、Agent 执行完成后会触发
on_chain_end()
等
- 自动调用
这都是 LangChain 的事件机制,你只需要写一个 callback handler 类,实现你想监听的钩子函数,它们会被自动触发
# 1)定义llm CallbackHandler
构造 Agent + 注册 LangChain Callback Handler
class WebSocketCallbackHandler(BaseCallbackHandler):
def __init__(self, task_id, ws_manager):
self.task_id = task_id
self.ws = ws_manager
def on_llm_new_token(self, token: str, **kwargs):
# 调用 ws.send_json(message) 推送日志到前端
self.ws.send_json_to_task(self.task_id, {"stage": "llm", "token": token})
def on_tool_end(self, output: str, **kwargs):
self.ws.send_json_to_task(self.task_id, {"stage": "tool", "output": output})
2
3
4
5
6
7
8
9
10
11
注册 handler 并传入 LangChain agent
callback_handler = WebSocketCallbackHandler(task_id, ws_manager)
agent = initialize_agent(
tools=[...],
llm=ChatOpenAI(streaming=True, callbacks=[callback_handler]),
agent=AgentType.OPENAI_FUNCTIONS
)
2
3
4
5
6
# 2)WebSocket 推给前端
callback_handler 如何用 WebSocket 把消息推给前端
ws_manager 的职责就是维护 task_id <--> websocket 的映射
class WSManager:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
def register(self, task_id, websocket):
self.connections[task_id] = websocket
def unregister(self, task_id):
self.connections.pop(task_id, None)
def send_json_to_task(self, task_id, message: dict):
ws = self.connections.get(task_id)
if ws:
asyncio.create_task(ws.send_json(message)) # 推送给前端
2
3
4
5
6
7
8
9
10
11
12
13
14
# 03.能力支撑层(LLM / Tool / Memory / RAG)
职责:完成实际智能体能力调用,如语言模型、工具插件、记忆检索、RAG 文档增强等
技术栈:
LLM:OpenAI / DeepSeek / AzureOpenAI(通过 LangChain 封装)
Tool:PDF Summarizer Tool / Search Tool(注册至 Agent 工具列表)
Memory:Redis-based ChatMessageHistory(按 user_id/session_id 分区)
RAG:FAISS / Qdrant / Weaviate 等向量库
完整链路调用举例
- 下为实际调用流程的完整代码片段简化版(可映射为 langgraph 节点)
# Step 1: 获取用户问题和上传的 PDF file_id
user_question = "这份合同的主要风险点有哪些?"
file_id = "file-123"
# Step 2: PDF 工具解析
parsed_summary = pdf_summary_tool.run(file_id)
# Step 3: 检索相关知识(RAG)
related_docs = rag_retriever.get_relevant_documents(user_question + parsed_summary)
context = "\n".join([doc.page_content for doc in related_docs])
# Step 4: 构造 Prompt + 历史记忆
chat_history = memory.load_memory_variables({})["chat_history"]
# Step 5: LLM 回答
answer = llm_chain.invoke({
"context": context,
"chat_history": chat_history,
"question": user_question
})
# Step 6: WebSocket 回调推送结果(通过 callback)
# → 自动触发 callback_handler.on_chain_end() → 推给前端
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 04.基础设施层(调度 / 存储 / 日志 / 权限)
职责:支撑整个智能体平台的运行时基础能力,包括持久化、权限控制、异步调度等
技术栈:
存储:MinIO(文档上传存储) + PostgreSQL(用户行为、调用日志)
调度:Celery / Redis Queue(如长任务、异步分析任务)
日志监控:Grafana + Loki + Promtail(接入所有 API 调用日志和 token 使用量)
配置管理:Pydantic / Consul(用于切换 LLM、RAG、功能灰度)
处理流程:
上传的文档存储在 MinIO,对应路径映射为
file_id
日志系统记录:
- 每次用户请求内容、响应耗时、所调用模型、工具使用情况等
如果任务耗时超长,调度系统将其抛入队列异步处理,用户稍后接收结果
所有模型调用计费行为,异步写入 PostgreSQL 供后期审计和分账
# 1、存储系统
# 1)文档对象存储(MinIO)
- 场景:用户上传合同文件,后续需被工具链(PDFSummaryTool)访问处理
# 用户上传 PDF
upload_path = f"{user_id}/uploads/{uuid4()}.pdf"
minio_client.put_object(
bucket_name="agent-files",
object_name=upload_path,
data=upload_file,
length=len(upload_file),
content_type="application/pdf"
)
# 返回 file_id = upload_path
2
3
4
5
6
7
8
9
10
# 2)结构化元信息(PostgreSQL)
- PostgreSQL 的选择原因(适配智能体平台)
对比维度 | PostgreSQL 优势 | MySQL / TiDB |
---|---|---|
JSON 结构化数据支持 | 原生 JSONB 字段,支持索引、复杂查询 | MySQL JSON 支持较弱、TiDB 次之 |
查询能力 | 支持复杂嵌套语法、全文检索、CTE(递归查询) | MySQL 查询语法相对简单 |
插件生态 | 支持向量搜索(pgvector)、全文检索(tsvector) | MySQL 插件少,TiDB 插件机制更弱 |
并发性能 | 适合中高并发的场景 | TiDB 强于 OLAP,弱于低延迟 OLTP |
开发者生态 | 与 AI 工具链生态(LangChain、Haystack)整合良好 | TiDB 不在主流 AI 框架默认支持列表 |
# 2、调度系统(Celery)
- 调用流程举例
# 检查任务类型 → 如果耗时较长,转调度异步执行
if is_long_task(user_input):
task_id = str(uuid4())
async_task.delay(user_input, file_id, task_id)
return JSONResponse({"task_id": task_id})
else:
result = agent_chain.invoke(...)
return result
2
3
4
5
6
7
8
- Celery worker 异步执行
@app.task(name="long_task_handler")
def async_task(user_input, file_id, task_id):
try:
# 处理流程如前:读取 PDF → RAG → LLM 生成结果
...
websocket_manager.push(task_id, final_result)
except Exception as e:
websocket_manager.push(task_id, f"任务失败:{str(e)}")
2
3
4
5
6
7
8
# 3、日志与监控系统
- Grafana + Loki + Promtail
接入目标
内容 来源 每次请求的 trace_id、耗时、用户 IP FastAPI Middleware 每次模型调用 token 用量、错误信息 LLM callback handler 工具运行耗时、输入输出 Tool 封装统一日志打点
实践配置方式
Promtail 负责采集本地 API 日志(FastAPI 或 Gunicorn)
Loki 为日志聚合查询
Grafana 面板展示
- 每小时总请求数 / 平均响应时间
- GPT-4 / GPT-3.5 各自 token 使用量图表
- 某个用户历史请求明细追踪