不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • 前置知识

  • PyTorch

  • Langchain

    • 01.Langchain基础
    • 02.数据检索增强RAG
    • 03.文档向量化原理
    • 04.Agent智能体
    • 05.快递Agent智能体
    • 06.LangGraph
    • 200.Langchain 实现原理
    • 202.AI销售智能体
    • 300.整体设计
      • 01.用户入口层(API / UI)
      • 02.Agent 执行编排层
        • 1、websocket 设计
        • 2、工具&缓存 是否使用
        • 3、动态创建Agent + 静态LangGraph
        • 1)动态构建 Agent(LangChain)
        • 2)静态流程定义(LangGraph)
        • 3)执行流程(调用 LangGraph)
        • 4、实时推送至前端
        • 1)定义llm CallbackHandler
        • 2)WebSocket 推给前端
      • 03.能力支撑层(LLM / Tool / Memory / RAG)
      • 04.基础设施层(调度 / 存储 / 日志 / 权限)
        • 1、存储系统
        • 1)文档对象存储(MinIO)
        • 2)结构化元信息(PostgreSQL)
        • 2、调度系统(Celery)
        • 3、日志与监控系统
  • 大模型
  • Langchain
xiaonaiqiang
2025-06-10
目录

300.整体设计

AI Agent 的服务平台四层架构

  • 用户入口层(API / UI)
  • Agent 执行编排层(Core)
  • 能力支撑层(LLM / Tool / Memory / RAG)
  • 基础设施层(存储 / 调度 / 日志 / 权限)
层级 关键职责 技术核心
用户入口层 请求接入、身份鉴权、上下文绑定 FastAPI + Redis + WebSocket + JWT
Agent 编排层 Agent 任务流程、调用决策、执行控制 LangChain / LangGraph + asyncio + Prompt 工程
能力支撑层 实际执行 LLM、工具、记忆检索、RAG 补充 LLM API + 工具封装 + Redis Memory + 向量库
基础设施层 存储、调度、日志、权限、配置等平台支撑 PostgreSQL + MinIO + Loki + Celery + Consul

# 01.用户入口层(API / UI)

  • 示例请求场景:用户在 Chat UI 输入一句话:「帮我总结一下这篇文章的重点」,并上传一段 PDF 内容
  • 职责:提供统一入口,识别用户身份、会话状态,转发到 Agent 核心逻辑

  • 技术栈:

    • React + WebSocket 用于 UI 实时响应

    • FastAPI 接收消息,统一路由处理

    • Redis 存储会话状态、上下文信息

    • JWT 认证 + OAuth 鉴权

处理流程

  • 用户通过 Web 前端 UI(React + Tailwind)输入请求 + 上传文件

  • 浏览器通过 WebSocket 向后端发送 JSON 请求

    • {
        "session_id": "sess-xyz",
        "user_input": "帮我总结一下这篇文章的重点",
        "document": "xxx.pdf"
      }
      
      1
      2
      3
      4
      5
  • FastAPI WebSocket 接收后

    • 解析 JWT token,提取用户身份
    • 确认 session_id 是否存在于 Redis,如果没有则初始化 session
    • 上传的文档先缓存(本地或 MinIO),并生成预处理 ID
    • 构造结构化调用参数转入下一层:agent_executor.invoke(input_dict)
  • 用户入口层的 API Handler 确实需要负责选择 Agent

    • 用户主动选择:比如 UI 上选择 客服助手、文档助手、数据分析 Agent
    • 系统智能分发:根据请求内容自动匹配合适的 Agent(类似 Intent Router)
      • 实现思路,调用一个小型 LLM,判断请求属于哪个 Agent
  • 注册多个 Agent 实例(初始化时加载)

    • agent_map = {
          "doc_summary": initialize_agent(...),
          "image_ocr": initialize_agent(...),
          "qa_agent": initialize_agent(...),
      }
      
      1
      2
      3
      4
      5

# 02.Agent 执行编排层

用户请求(上传 PDF)
└─> Agent 创建层(动态构造 AgentExecutor) ← 是否用 Tool,是否带 Memory?
    └─> LangGraph 执行层(静态状态图流程)
        ├─> 状态 A:文档解析(工具)
        ├─> 状态 B:内容总结(调用 LLM)
        └─> 状态 C:结果格式化并返回
1
2
3
4
5
6
  • 职责:对接 LangChain/LangGraph,负责执行计划、调用 Memory、决定工具是否使用等

  • 技术栈:

    • LangChain AgentExecutor(ReAct / Function Agent)

    • LangGraph(用于多阶段 Agent 状态流转)

    • Pydantic 构造请求结构

    • asyncio 控制执行流程

# 1、websocket 设计

  • 收到上层传入的 input_dict(使用 http 请求)

    • 后端 → 立即生成唯一task_id并返回

    • {
        "input": "帮我总结一下这篇文章的重点",
        "file_id": "doc-123",
        "chat_history": [...],
        "user_id": "u-001"
      }
      
      1
      2
      3
      4
      5
      6
    • 结果获取阶段(WebSocket)

      • 前端 → 使用task_id建立WebSocket连接
      • 后端 → 通过同一WS连接持续推送:排队状态 → 执行进度 → 最终结果/错误信息

# 2、工具&缓存 是否使用

  • 判断是否需要 Tool

    • 显式判断:用户是否上传了 PDF/图片/表格?
    • 意图识别:通过 LLM 判断意图,如“总结、翻译、提取数据” → 绑定相应 Tool
  • 判断是否需要开启 Memory

    • 检查请求是否带有 session_id 和 user_id

    • 读取 Redis,恢复为 ConversationBufferMemory

    • Memory 会自动记录过去对话,并注入到 Prompt 中

    • 用户的历史对话存在 Redis 中,结构示意

    • Key: "chat_memory:{user_id}:{session_id}"
      Value: JSON list [
        {"role": "human", "content": "你好"},
        {"role": "ai", "content": "你好,我能帮你什么?"},
        ...
      ]
      
      1
      2
      3
      4
      5
      6

# 3、动态创建Agent + 静态LangGraph

  • 在企业实际部署中,每种典型业务流程(如文档解析、文本审核、图像摘要)会预定义一个 LangGraph 状态机
  • 已经固定好了功能与调用顺序(如解析 ➜ 总结 ➜ 输出)
  • 内部调用的是一个 通用 Agent(通过参数/上下文灵活配置工具、Memory)
  • 动态选项说明

    • 动态项 实现方式 举例说明
      Agent 内部的 Tools 根据请求类型动态装配 上传了 PDF 则附加 PDFParseTool
      Memory 根据 user_id / session_id 动态加载 同一用户上下文连续性支持
      Prompt 输入内容 由每个节点动态生成 input_dict 比如 input = f"总结:{parsed_data}"
      LangGraph 状态 可通过 if-else / branching 实现分支 解析成功才走总结,否则报错分支

# 1)动态构建 Agent(LangChain)

  • 根据用户请求动态生成 AgentExecutor
  • 是否需要工具、是否绑定 memory、LLM 模型选择
def create_agent(request):
    tools = []
    if request.file_type == "pdf":
        tools.append(PDFParseTool)

    memory = None
    if request.session_id:
        memory = get_memory_from_redis(...)

    agent = initialize_agent(
        tools=tools,
        llm=ChatOpenAI(...),
        agent=AgentType.OPENAI_FUNCTIONS,
        memory=memory
    )
    return agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2)静态流程定义(LangGraph)

  • LangGraph 是一套有状态的链式执行框架,它把 agent 的调用变成 节点,节点之间通过条件流转

  • 三个阶段节点定义

    • 解析阶段:调用工具,提取文档结构
    • 总结阶段:把结构化数据交给 LLM 总结
    • 输出阶段:生成最终格式结果
  • LangGraph 用 StateGraph 定义流程,类似状态机

def langgraph_app(agent):
    def parse_pdf_tool(state):
        return agent.invoke({"input": "请解析文档结构", "file": state["file"]})

    def summarize_with_llm(state):
        return agent.invoke({"input": f"总结内容:{state['parsed_data']}"})

    def format_result(state):
        return {"summary": state["summary"], "status": "done"}

    with StateGraph() as graph:
        graph.add_node("parse", parse_pdf_tool)
        graph.add_node("summarize", summarize_with_llm)
        graph.add_node("format", format_result)

        graph.set_entry_point("parse")
        graph.add_edge("parse", "summarize")
        graph.add_edge("summarize", "format")
        graph.set_finish_point("format")

    return graph.compile()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 3)执行流程(调用 LangGraph)

  • 前面已经构造好动态 Agent,现在将其用在 LangGraph 流程节点中
def handle_pdf_request(file, user_id):
    agent = create_agent(file, user_id)  # 动态构建 Agent(LangChain)
    graph_app = langgraph_app(agent)    # 构造 LangGraph 实例

    result = graph_app.invoke({"file": file}) # 传入用户输入(或上一步中间结果)
    return result
1
2
3
4
5
6

# 4、实时推送至前端

  • LLM 组件(如 ChatOpenAI)内部在每个 token 输出时会

    • 自动调用 on_llm_new_token();
    • 工具执行后会触发 on_tool_end();
    • 整个 Chain、Agent 执行完成后会触发 on_chain_end() 等
  • 这都是 LangChain 的事件机制,你只需要写一个 callback handler 类,实现你想监听的钩子函数,它们会被自动触发

# 1)定义llm CallbackHandler

  • 构造 Agent + 注册 LangChain Callback Handler
class WebSocketCallbackHandler(BaseCallbackHandler):
    def __init__(self, task_id, ws_manager):
        self.task_id = task_id
        self.ws = ws_manager

    def on_llm_new_token(self, token: str, **kwargs):
        # 调用 ws.send_json(message) 推送日志到前端
        self.ws.send_json_to_task(self.task_id, {"stage": "llm", "token": token})

    def on_tool_end(self, output: str, **kwargs):
        self.ws.send_json_to_task(self.task_id, {"stage": "tool", "output": output})
1
2
3
4
5
6
7
8
9
10
11
  • 注册 handler 并传入 LangChain agent
callback_handler = WebSocketCallbackHandler(task_id, ws_manager)
agent = initialize_agent(
    tools=[...],
    llm=ChatOpenAI(streaming=True, callbacks=[callback_handler]),
    agent=AgentType.OPENAI_FUNCTIONS
)
1
2
3
4
5
6

# 2)WebSocket 推给前端

  • callback_handler 如何用 WebSocket 把消息推给前端

  • ws_manager 的职责就是维护 task_id <--> websocket 的映射

class WSManager:
    def __init__(self):
        self.connections: Dict[str, WebSocket] = {}

    def register(self, task_id, websocket):
        self.connections[task_id] = websocket

    def unregister(self, task_id):
        self.connections.pop(task_id, None)

    def send_json_to_task(self, task_id, message: dict):
        ws = self.connections.get(task_id)
        if ws:
            asyncio.create_task(ws.send_json(message))  # 推送给前端
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 03.能力支撑层(LLM / Tool / Memory / RAG)

  • 职责:完成实际智能体能力调用,如语言模型、工具插件、记忆检索、RAG 文档增强等

  • 技术栈:

    • LLM:OpenAI / DeepSeek / AzureOpenAI(通过 LangChain 封装)

    • Tool:PDF Summarizer Tool / Search Tool(注册至 Agent 工具列表)

    • Memory:Redis-based ChatMessageHistory(按 user_id/session_id 分区)

    • RAG:FAISS / Qdrant / Weaviate 等向量库

  • 完整链路调用举例

    • 下为实际调用流程的完整代码片段简化版(可映射为 langgraph 节点)
# Step 1: 获取用户问题和上传的 PDF file_id
user_question = "这份合同的主要风险点有哪些?"
file_id = "file-123"

# Step 2: PDF 工具解析
parsed_summary = pdf_summary_tool.run(file_id)

# Step 3: 检索相关知识(RAG)
related_docs = rag_retriever.get_relevant_documents(user_question + parsed_summary)
context = "\n".join([doc.page_content for doc in related_docs])

# Step 4: 构造 Prompt + 历史记忆
chat_history = memory.load_memory_variables({})["chat_history"]

# Step 5: LLM 回答
answer = llm_chain.invoke({
    "context": context,
    "chat_history": chat_history,
    "question": user_question
})

# Step 6: WebSocket 回调推送结果(通过 callback)
# → 自动触发 callback_handler.on_chain_end() → 推给前端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 04.基础设施层(调度 / 存储 / 日志 / 权限)

  • 职责:支撑整个智能体平台的运行时基础能力,包括持久化、权限控制、异步调度等

  • 技术栈:

    • 存储:MinIO(文档上传存储) + PostgreSQL(用户行为、调用日志)

    • 调度:Celery / Redis Queue(如长任务、异步分析任务)

    • 日志监控:Grafana + Loki + Promtail(接入所有 API 调用日志和 token 使用量)

    • 配置管理:Pydantic / Consul(用于切换 LLM、RAG、功能灰度)

  • 处理流程:

    • 上传的文档存储在 MinIO,对应路径映射为 file_id

    • 日志系统记录:

      • 每次用户请求内容、响应耗时、所调用模型、工具使用情况等
    • 如果任务耗时超长,调度系统将其抛入队列异步处理,用户稍后接收结果

    • 所有模型调用计费行为,异步写入 PostgreSQL 供后期审计和分账

# 1、存储系统

# 1)文档对象存储(MinIO)

  • 场景:用户上传合同文件,后续需被工具链(PDFSummaryTool)访问处理
# 用户上传 PDF
upload_path = f"{user_id}/uploads/{uuid4()}.pdf"
minio_client.put_object(
    bucket_name="agent-files",
    object_name=upload_path,
    data=upload_file,
    length=len(upload_file),
    content_type="application/pdf"
)
# 返回 file_id = upload_path
1
2
3
4
5
6
7
8
9
10

# 2)结构化元信息(PostgreSQL)

  • PostgreSQL 的选择原因(适配智能体平台)
对比维度 PostgreSQL 优势 MySQL / TiDB
JSON 结构化数据支持 原生 JSONB 字段,支持索引、复杂查询 MySQL JSON 支持较弱、TiDB 次之
查询能力 支持复杂嵌套语法、全文检索、CTE(递归查询) MySQL 查询语法相对简单
插件生态 支持向量搜索(pgvector)、全文检索(tsvector) MySQL 插件少,TiDB 插件机制更弱
并发性能 适合中高并发的场景 TiDB 强于 OLAP,弱于低延迟 OLTP
开发者生态 与 AI 工具链生态(LangChain、Haystack)整合良好 TiDB 不在主流 AI 框架默认支持列表

# 2、调度系统(Celery)

  • 调用流程举例
# 检查任务类型 → 如果耗时较长,转调度异步执行
if is_long_task(user_input):
    task_id = str(uuid4())
    async_task.delay(user_input, file_id, task_id)
    return JSONResponse({"task_id": task_id})
else:
    result = agent_chain.invoke(...)
    return result
1
2
3
4
5
6
7
8
  • Celery worker 异步执行
@app.task(name="long_task_handler")
def async_task(user_input, file_id, task_id):
    try:
        # 处理流程如前:读取 PDF → RAG → LLM 生成结果
        ...
        websocket_manager.push(task_id, final_result)
    except Exception as e:
        websocket_manager.push(task_id, f"任务失败:{str(e)}")
1
2
3
4
5
6
7
8

# 3、日志与监控系统

  • Grafana + Loki + Promtail
  • 接入目标

    • 内容 来源
      每次请求的 trace_id、耗时、用户 IP FastAPI Middleware
      每次模型调用 token 用量、错误信息 LLM callback handler
      工具运行耗时、输入输出 Tool 封装统一日志打点
  • 实践配置方式

    • Promtail 负责采集本地 API 日志(FastAPI 或 Gunicorn)

    • Loki 为日志聚合查询

    • Grafana 面板展示

      • 每小时总请求数 / 平均响应时间
      • GPT-4 / GPT-3.5 各自 token 使用量图表
      • 某个用户历史请求明细追踪
202.AI销售智能体

← 202.AI销售智能体

最近更新
01
06.LangGraph
06-09
02
202.AI销售智能体
06-07
03
05.快递Agent智能体
06-04
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式