OpenAI API 接口对接完全教程 / 14 - AI Agent 架构
第 14 章 · AI Agent 架构设计
AI Agent 能够自主决策、调用工具、记忆上下文并完成复杂任务。本章详解 Agent 设计模式、工具链、记忆系统和多 Agent 协作。
14.1 什么是 AI Agent
AI Agent = LLM(大脑) + Tools(工具) + Memory(记忆) + Planning(规划)
用户目标 → Agent 思考 → 选择工具 → 执行 → 观察结果 → 继续思考 → ... → 完成
Agent vs Chat 对比
| 特性 | Chat | Agent |
|---|
| 输入输出 | 单轮问答 | 多步自主执行 |
| 工具使用 | 用户指定 | 自主选择 |
| 决策能力 | 被动响应 | 主动规划 |
| 记忆 | 上下文窗口 | 短期+长期记忆 |
| 适用场景 | 简单对话 | 复杂任务自动化 |
14.2 基础 Agent 实现
ReAct 模式(Reasoning + Acting)
from openai import OpenAI
import json
from typing import Callable
client = OpenAI()
class ReActAgent:
"""基于 ReAct 模式的 Agent"""
def __init__(self, model: str = "gpt-4o", tools: dict[str, Callable] = None):
self.model = model
self.tools = tools or {}
self.messages: list[dict] = []
self.max_iterations = 10
def register_tool(self, name: str, func: Callable, description: str, parameters: dict):
"""注册工具"""
self.tools[name] = {
"function": func,
"definition": {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters,
}
}
}
def run(self, task: str, system_prompt: str = "") -> str:
"""执行任务"""
self.messages = []
if system_prompt:
self.messages.append({"role": "system", "content": system_prompt})
self.messages.append({"role": "user", "content": task})
for iteration in range(self.max_iterations):
# 调用 LLM
response = client.chat.completions.create(
model=self.model,
messages=self.messages,
tools=[t["definition"] for t in self.tools.values()],
tool_choice="auto",
)
message = response.choices[0].message
self.messages.append(message)
# 没有工具调用,任务完成
if not message.tool_calls:
return message.content
# 执行工具调用
for tool_call in message.tool_calls:
name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
print(f" [工具调用] {name}({json.dumps(args, ensure_ascii=False)[:100]})")
try:
if name in self.tools:
result = self.tools[name]["function"](**args)
result_str = json.dumps(result, ensure_ascii=False) if not isinstance(result, str) else result
else:
result_str = json.dumps({"error": f"未知工具: {name}"})
except Exception as e:
result_str = json.dumps({"error": str(e)})
self.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result_str[:3000], # 限制结果长度
})
return "达到最大迭代次数,任务未完成。"
# 使用示例
agent = ReActAgent()
# 注册工具
import math
from datetime import datetime
agent.register_tool(
name="calculate",
func=lambda expression: {"result": eval(expression, {"__builtins__": {}}, {"math": math})},
description="执行数学计算",
parameters={
"type": "object",
"properties": {"expression": {"type": "string", "description": "数学表达式"}},
"required": ["expression"],
}
)
agent.register_tool(
name="get_current_time",
func=lambda: {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
description="获取当前时间",
parameters={"type": "object", "properties": {}},
)
# 运行
result = agent.run(
task="现在几点了?另外计算一下 2 的 10 次方。",
system_prompt="你是一个智能助手,可以使用工具来帮助用户。",
)
print(f"\n最终结果: {result}")
14.3 工具链设计
设计原则
| 原则 | 说明 |
|---|
| 单一职责 | 每个工具只做一件事 |
| 描述清晰 | description 要让 LLM 准确理解 |
| 参数校验 | 工具内部验证输入 |
| 错误处理 | 返回错误而非抛异常 |
| 超时控制 | 外部调用设置超时 |
常见工具分类
# 数据查询类
tools_data = [
{"name": "query_database", "desc": "查询数据库"},
{"name": "search_web", "desc": "搜索网页"},
{"name": "get_weather", "desc": "获取天气"},
]
# 操作执行类
tools_action = [
{"name": "send_email", "desc": "发送邮件"},
{"name": "create_task", "desc": "创建任务"},
{"name": "update_record", "desc": "更新记录"},
]
# 信息处理类
tools_process = [
{"name": "calculate", "desc": "数学计算"},
{"name": "translate", "desc": "文本翻译"},
{"name": "summarize", "desc": "文本摘要"},
]
安全工具包装器
import functools
import time
def safe_tool(timeout: int = 10, max_retries: int = 2):
"""工具安全装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
if elapsed > timeout:
return {"error": "执行超时", "elapsed": elapsed}
return result
except Exception as e:
if attempt == max_retries:
return {"error": str(e), "retries": attempt}
time.sleep(0.5 * (attempt + 1))
return wrapper
return decorator
@safe_tool(timeout=5)
def query_database(sql: str) -> dict:
"""安全的数据库查询"""
# 实际实现...
return {"rows": [], "count": 0}
14.4 记忆系统
14.4.1 短期记忆(对话历史)
class ShortTermMemory:
"""对话历史管理"""
def __init__(self, max_tokens: int = 8000):
self.messages: list[dict] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""裁剪历史,保持在 token 限制内"""
total = sum(len(m["content"]) for m in self.messages)
while total > self.max_tokens and len(self.messages) > 2:
removed = self.messages.pop(0)
total -= len(removed["content"])
def get_messages(self) -> list[dict]:
return self.messages.copy()
def clear(self):
self.messages.clear()
14.4.2 长期记忆(向量存储)
class LongTermMemory:
"""基于向量的长期记忆"""
def __init__(self):
self.client = OpenAI()
self.memories: list[dict] = []
self.embeddings: list[list[float]] = []
def store(self, content: str, metadata: dict = None):
"""存储记忆"""
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=content,
)
self.memories.append({
"content": content,
"metadata": metadata or {},
"timestamp": datetime.now().isoformat(),
})
self.embeddings.append(response.data[0].embedding)
def recall(self, query: str, top_k: int = 5) -> list[dict]:
"""检索相关记忆"""
if not self.memories:
return []
import numpy as np
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=query,
)
query_vec = np.array(response.data[0].embedding)
mem_vecs = np.array(self.embeddings)
# 余弦相似度
norms = np.linalg.norm(mem_vecs, axis=1)
similarities = np.dot(mem_vecs, query_vec) / (norms * np.linalg.norm(query_vec))
top_indices = np.argsort(similarities)[::-1][:top_k]
return [
{**self.memories[i], "score": float(similarities[i])}
for i in top_indices
]
14.4.3 工作记忆(任务上下文)
class WorkingMemory:
"""当前任务的工作记忆"""
def __init__(self):
self.goal: str = ""
self.plan: list[str] = []
self.current_step: int = 0
self.observations: list[dict] = []
self.scratchpad: dict = {} # 临时变量存储
def set_goal(self, goal: str):
self.goal = goal
self.plan = []
self.current_step = 0
self.observations = []
def set_plan(self, steps: list[str]):
self.plan = steps
self.current_step = 0
def add_observation(self, step: str, result: str):
self.observations.append({
"step": step,
"result": result,
"timestamp": datetime.now().isoformat(),
})
def get_context(self) -> str:
"""获取当前任务上下文"""
ctx = f"目标: {self.goal}\n"
if self.plan:
ctx += f"计划: {' → '.join(self.plan)}\n"
ctx += f"当前步骤: {self.current_step + 1}/{len(self.plan)}\n"
if self.observations:
ctx += "已执行:\n"
for obs in self.observations[-5:]:
ctx += f" - {obs['step']}: {obs['result'][:100]}\n"
return ctx
14.5 完整 Agent 框架
class AIAgent:
"""完整的 AI Agent 实现"""
def __init__(self, name: str, model: str = "gpt-4o"):
self.name = name
self.model = model
self.client = OpenAI()
self.tools: dict[str, dict] = {}
self.short_memory = ShortTermMemory()
self.long_memory = LongTermMemory()
self.working_memory = WorkingMemory()
self.system_prompt = ""
def set_system_prompt(self, prompt: str):
self.system_prompt = prompt
def register_tool(self, name: str, func: Callable, description: str, parameters: dict):
self.tools[name] = {
"function": func,
"definition": {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters,
}
}
}
def execute(self, task: str) -> str:
"""执行任务"""
# 1. 从长期记忆召回相关信息
relevant_memories = self.long_memory.recall(task, top_k=3)
memory_context = ""
if relevant_memories:
memory_context = "\n相关记忆:\n" + "\n".join(
[f"- {m['content']}" for m in relevant_memories]
)
# 2. 构建消息
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt + memory_context})
messages.extend(self.short_memory.get_messages())
messages.append({"role": "user", "content": task})
# 3. 执行循环
for _ in range(10):
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=[t["definition"] for t in self.tools.values()],
tool_choice="auto",
)
message = response.choices[0].message
messages.append(message)
if not message.tool_calls:
# 4. 存储到记忆
self.short_memory.add("user", task)
self.short_memory.add("assistant", message.content)
self.long_memory.store(f"任务: {task}\n结果: {message.content}")
return message.content
# 执行工具
for tc in message.tool_calls:
name = tc.function.name
args = json.loads(tc.function.arguments)
try:
result = self.tools[name]["function"](**args)
result_str = json.dumps(result, ensure_ascii=False) if not isinstance(result, str) else result
except Exception as e:
result_str = json.dumps({"error": str(e)})
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result_str[:2000],
})
return "任务未完成,达到最大迭代次数。"
14.6 多 Agent 协作
协作模式
| 模式 | 说明 | 适用场景 |
|---|
| 主从模式 | 一个主 Agent 调度多个子 Agent | 通用任务分解 |
| 对等模式 | Agent 之间平等协作 | 讨论、辩论 |
| 流水线 | 前一个 Agent 的输出作为下一个的输入 | 多阶段处理 |
| 辩论模式 | 多个 Agent 对同一问题给出不同观点 | 决策支持 |
主从模式实现
class OrchestratorAgent:
"""主控 Agent:分解任务并协调子 Agent"""
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
self.agents: dict[str, AIAgent] = {}
def register_agent(self, name: str, agent: AIAgent, description: str):
self.agents[name] = {"agent": agent, "description": description}
def run(self, task: str) -> str:
"""分解任务并调度执行"""
# 构建可用 Agent 列表
agent_list = "\n".join([
f"- {name}: {info['description']}"
for name, info in self.agents.items()
])
# 规划任务分解
plan_response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": f"""你是一个任务规划器。
可用的子 Agent:
{agent_list}
将用户任务分解为子任务,分配给合适的 Agent。
输出 JSON: {{"tasks": [{{"agent": "name", "task": "description"}}]}}"""},
{"role": "user", "content": task},
],
response_format={"type": "json_object"},
temperature=0.0,
)
import json
plan = json.loads(plan_response.choices[0].message.content)
# 执行子任务
results = []
for subtask in plan.get("tasks", []):
agent_name = subtask["agent"]
agent_task = subtask["task"]
if agent_name in self.agents:
print(f"[调度] {agent_name}: {agent_task}")
result = self.agents[agent_name]["agent"].execute(agent_task)
results.append({"agent": agent_name, "task": agent_task, "result": result})
# 汇总结果
summary_input = "\n\n".join([
f"Agent [{r['agent']}] 执行: {r['task']}\n结果: {r['result']}"
for r in results
])
summary = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "汇总各子任务的结果,给出最终回答。"},
{"role": "user", "content": f"原始任务: {task}\n\n子任务结果:\n{summary_input}"},
],
)
return summary.choices[0].message.content
14.7 Planning 策略
计划-执行-反思
class PlanAndExecuteAgent:
"""计划-执行-反思 Agent"""
def plan(self, task: str) -> list[str]:
"""制定执行计划"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "为任务制定详细的执行步骤。输出JSON: {\"steps\": [\"step1\", ...]}"},
{"role": "user", "content": task},
],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)["steps"]
def reflect(self, task: str, steps: list[str], results: list[str]) -> dict:
"""反思执行结果,决定是否需要调整"""
reflection = "\n".join([f"步骤: {s}\n结果: {r}" for s, r in zip(steps, results)])
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "评估执行结果。输出JSON: {\"complete\": bool, \"next_action\": \"...\"}"},
{"role": "user", "content": f"任务: {task}\n\n执行记录:\n{reflection}"},
],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
14.8 业务场景
| 场景 | Agent 类型 | 工具组合 |
|---|
| 数据分析助手 | ReAct | SQL查询 + 图表生成 + 报告撰写 |
| 客服自动化 | 主从模式 | 知识库检索 + 工单系统 + 情感分析 |
| 内容创作 | 流水线 | 调研 + 写作 + 审核 + 发布 |
| 代码审查 | ReAct | 代码分析 + Bug检测 + 安全扫描 |
| 旅行规划 | Plan-Execute | 机票查询 + 酒店查询 + 行程优化 |
14.9 注意事项
- 迭代次数限制:设置 max_iterations 防止死循环
- 工具结果截断:过长的工具结果会撑爆上下文
- 成本控制:Agent 的多轮调用会消耗大量 token
- 安全边界:敏感操作(删除、支付)需要人工确认
- 错误恢复:工具失败时 Agent 应能尝试替代方案
- 日志记录:记录每一步的思考和工具调用,便于调试
- 测试验证:Agent 行为不确定性高,需要充分测试
14.10 扩展阅读
下一章:15 - 最佳实践 — 错误处理、成本控制、安全防护、限流。