2. 状态管理核心

2.状态管理核心

状态管理是 LangGraph 的基石。上一章我们快速搭建了一个 StateGraph,但那个例子里的状态只是一个简单的字符串。在实际应用中,状态往往复杂得多——可能包含消息历史、用户资料、临时计算结果,甚至跨会话的持久化数据。理解 LangGraph 的状态管理机制,是构建可靠、可维护的代理系统的关键。

状态模式 TypedDict 定义

在 LangGraph 中,状态模式定义了整个图的结构。它告诉每个节点可以访问哪些数据,以及可以更新哪些字段。Python 用户通常会使用 TypedDict 来定义状态模式,这种方式既简洁又具备类型提示能力。

基本定义方式

最基础的状态定义看起来像这样:

from typing_extensions import TypedDict

class State(TypedDict):
    text: str
    count: int

这个 State 定义了两个字段:text 和 count。所有节点都会接收到这个状态对象,并且可以返回一个字典来更新其中的部分或全部字段。重要的是,节点不需要返回完整的状态,只需要返回想要更新的字段即可。

为什么用 TypedDict

TypedDict 是 Python 标准库的一部分,它提供了轻量级的类型检查,没有运行时开销。与 Pydantic 相比,TypedDict 更加灵活,性能也更好。LangGraph 的设计哲学是保持核心框架的轻量,因此官方文档主要推荐使用 TypedDict。

当然,如果项目需要更严格的数据验证,也可以使用 Pydantic BaseModel。不过需要注意,Pydantic 的递归验证会带来性能损耗,对于高频调用的节点可能成为瓶颈。

状态字段的灵活性

状态模式不是一成不变的。在图执行过程中,节点可以动态添加新的状态通道。例如,即使初始状态只定义了 messages 字段,节点仍然可以在运行时返回 {"new_field": "value"},LangGraph 会自动将这个新字段加入状态。

这种设计带来了极大的灵活性。我们可以先定义核心字段,然后在开发过程中根据需要逐步扩展状态结构,而不需要一开始就设计完美的模式。

Reducer 函数状态更新

Reducer 是 LangGraph 状态管理的核心概念。它决定了当多个节点更新同一个状态字段时,这些更新如何合并。如果没有正确理解 Reducer,很容易在并行节点执行时遇到状态冲突问题。

默认行为:覆盖更新

默认情况下,状态字段采用覆盖式更新。看一个例子:

from typing_extensions import TypedDict

class State(TypedDict):
    foo: int
    bar: list[str]

假设初始状态是 {"foo": 1, "bar": ["hi"]},第一个节点返回 {"foo": 2},那么状态会变为 {"foo": 2, "bar": ["hi"]}。第二个节点返回 {"bar": ["bye"]},最终状态是 {"foo": 2, "bar": ["bye"]}。

注意 bar 字段的变化:第二个节点完全覆盖了第一个节点的值,而不是追加。这在很多场景下并不是我们想要的行为。

使用 Annotated 指定 Reducer

为了更精细地控制更新行为,可以使用 typing.Annotated 来指定 Reducer 函数:

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

现在,同样的场景下:初始状态 {"foo": 1, "bar": ["hi"]},第一个节点返回 {"foo": 2},状态变为 {"foo": 2, "bar": ["hi"]}。第二个节点返回 {"bar": ["bye"]},最终状态是 {"foo": 2, "bar": ["hi", "bye"]}。

operator.add 将两个列表连接起来,实现了追加效果。这种模式在处理消息列表、日志记录等场景时特别有用。

并行执行中的 Reducer

Reducer 在处理并行节点时变得至关重要。考虑一个并行执行的图:

from typing_extensions import TypedDict

class State(TypedDict):
    some_key: str  # 没有 Reducer

def node_a(state: State):
    return {"some_key": "value_a"}

def node_b(state: State):
    return {"some_key": "value_b"}

builder = StateGraph(State)
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b")

如果 node_a 和 node_b 同时执行并返回 some_key 的更新,LangGraph 会抛出 INVALID_CONCURRENT_GRAPH_UPDATE 错误。因为它不知道应该保留哪个值。

解决方案是为 some_key 指定一个 Reducer:

from typing import Annotated
from typing_extensions import TypedDict
import operator

class State(TypedDict):
    some_key: Annotated[list, operator.add]

def node_a(state: State):
    return {"some_key": ["value_a"]}

def node_b(state: State):
    return {"some_key": ["value_b"]}

现在两个节点可以并行执行,它们的更新会被合并成一个列表 ["value_a", "value_b"]。

自定义 Reducer 函数

除了内置的 operator,还可以定义自己的 Reducer 函数。Reducer 接收两个参数:当前值和更新值,返回合并后的值。例如,实现一个去重的 Reducer:

def unique_add(current: list, update: list):
    combined = current + update
    seen = set()
    result = []
    for item in combined:
        if item not in seen:
            seen.add(item)
            result.append(item)
    return result

class State(TypedDict):
    items: Annotated[list, unique_add]

这个 Reducer 在追加新元素的同时,确保列表中没有重复项。

MessagesState 消息管理

在 LLM 应用中,消息历史是最常见的状态类型。LangGraph 为此提供了专门的处理机制。

为什么需要特殊处理

消息列表与普通列表不同,它需要:

  1. 追加新消息而不是覆盖
  2. 支持消息 ID,以便更新已存在的消息
  3. 接受多种消息格式(LangChain Message 对象、OpenAI 格式等)
  4. 自动反序列化

add_messages Reducer

LangGraph 提供了内置的 add_messages Reducer,它解决了上述所有问题:

from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

add_messages 的行为很智能:

  • 对于新消息,追加到列表末尾
  • 对于带有 ID 的已存在消息,更新原消息
  • 自动将字典格式的消息反序列化为 LangChain Message 对象

消息格式兼容性

使用 add_messages 后,可以以多种格式传递消息:

# 使用 LangChain Message 对象
{"messages": [HumanMessage(content="hello")]}

# 使用字典格式
{"messages": [{"type": "human", "content": "hello"}]}

# OpenAI 格式
{"messages": [{"role": "user", "content": "hello"}]}

add_messages 会自动处理这些格式,统一转换为 LangChain Message 对象。这意味着在节点内部,可以始终使用一致的 API 访问消息内容:

def process_messages(state: State):
    last_message = state["messages"][-1]
    content = last_message.content
    # 无论输入是什么格式,这里都能正常工作

MessagesState 预定义状态

由于消息状态如此常见,LangGraph 提供了预定义的 MessagesState:

from langgraph.graph import MessagesState

class State(MessagesState):
    documents: list[str]
    user_info: dict

MessagesState 已经包含了 messages 字段和 add_messages Reducer,可以直接继承并添加其他字段。这让代码更加简洁,意图也更清晰。

消息序列化细节

add_messages 在序列化时会保留消息的所有属性,包括 tool_calls、response_metadata 等。这对于调试和追踪非常重要。当状态被持久化到检查点时,完整的消息历史都会被保存,包括 LLM 的原始响应元数据。

输入输出模式分离设计

随着应用复杂度增加,可能需要更精细地控制状态的可见性。LangGraph 支持为图定义独立的输入、输出和内部状态模式。

为什么需要分离

考虑一个数据处理管道:

  • 输入:原始查询字符串
  • 内部:查询解析结果、中间处理数据、临时计算结果
  • 输出:最终答案

如果所有节点都使用同一个状态模式,会导致:

  1. 输入包含不必要的内部字段
  2. 输出包含临时数据,暴露实现细节
  3. 节点可以访问不该访问的数据

定义分离的模式

from typing_extensions import TypedDict

class InputState(TypedDict):
    user_input: str  # 只有用户输入

class OutputState(TypedDict):
    answer: str  # 只有最终答案

class OverallState(TypedDict):
    user_input: str
    answer: str
    temp_data: dict  # 内部临时数据
    processing_steps: list  # 内部处理步骤

在 StateGraph 中使用分离模式

from langgraph.graph import StateGraph, START, END

def parse_query(state: InputState) -> OverallState:
    # 接收 InputState,返回 OverallState
    return {
        "user_input": state["user_input"],
        "temp_data": {"parsed": True},
        "processing_steps": ["parsed"],
        "answer": ""
    }

def process_data(state: OverallState) -> OverallState:
    # 使用内部状态进行计算
    return {"temp_data": {"processed": True}, "processing_steps": state["processing_steps"] + ["processed"]}

def generate_answer(state: OverallState) -> OutputState:
    # 从内部状态提取输出
    return {"answer": f"Answer based on {state['user_input']}"}

builder = StateGraph(
    OverallState,
    input_schema=InputState,
    output_schema=OutputState
)
builder.add_node(parse_query)
builder.add_node(process_data)
builder.add_node(generate_answer)
builder.add_edge(START, "parse_query")
builder.add_edge("parse_query", "process_data")
builder.add_edge("process_data", "generate_answer")
builder.add_edge("generate_answer", END)

graph = builder.compile()

调用时只需要提供 InputState:

result = graph.invoke({"user_input": "hello world"})
# 返回:{"answer": "Answer based on hello world"}

私有状态通道

除了输入输出分离,还可以定义完全私有的状态通道。这些通道只在图内部使用,不会暴露给调用者:

class PrivateState(TypedDict):
    internal_cache: dict
    debug_info: list

def node_with_private_state(state: OverallState) -> PrivateState:
    return {
        "internal_cache": {"key": "value"},
        "debug_info": ["step completed"]
    }

即使 StateGraph 初始化时没有指定 PrivateState,节点仍然可以返回 PrivateState 的字段。LangGraph 会自动将这些字段添加到内部状态中。这种机制让节点可以传递不需要暴露给外部的临时数据。

模式继承与组合

状态模式支持继承,这在构建复杂的代理系统时很有用:

class BaseState(TypedDict):
    messages: Annotated[list, add_messages]
    timestamp: str

class AgentState(BaseState):
    agent_name: str
    task_queue: list

class SupervisorState(BaseState):
    supervisor_decision: str
    assigned_agent: str

通过继承,可以构建层次化的状态结构,同时保持代码的复用性和清晰性。

实际应用示例

考虑一个多代理系统,其中每个代理有自己的私有状态:

class SharedState(TypedDict):
    messages: Annotated[list, add_messages]
    current_agent: str

class Agent1PrivateState(TypedDict):
    agent1_memory: list
    agent1_confidence: float

class Agent2PrivateState(TypedDict):
    agent2_tools_used: list
    agent2_results: dict

def agent1_node(state: SharedState) -> Agent1PrivateState:
    # 处理共享状态,更新私有状态
    return {
        "agent1_memory": [f"processed: {state['messages'][-1]}"],
        "agent1_confidence": 0.95
    }

def agent2_node(state: SharedState) -> Agent2PrivateState:
    return {
        "agent2_tools_used": ["search", "calc"],
        "agent2_results": {"status": "complete"}
    }

这种设计让每个代理可以维护自己的内部状态,同时通过共享状态进行协作。父图不需要关心子图的私有状态,只需要处理共享的接口。

状态管理的最佳实践

基于实际项目经验,这里总结一些状态管理的建议:

保持状态最小化

只存储必要的数据。每个状态字段都应该有明确的用途。过多的状态字段会增加图的复杂性,也让调试变得困难。

合理使用 Reducer

对于可能被多个节点更新的字段,务必定义 Reducer。即使当前没有并行执行,未来的需求变更可能会引入并行节点,提前使用 Reducer 可以避免潜在的并发问题。

消息状态优先使用 MessagesState

如果应用涉及对话,优先使用 MessagesState 或 add_messages Reducer。这能确保消息历史的正确处理,避免手动管理消息列表的陷阱。

分离关注点

利用输入输出模式分离,将公共接口与内部实现分开。这不仅提高了代码的可维护性,也让图的行为更加可预测。调用者只需要关心输入输出,不需要了解内部的处理细节。

文档化状态字段

为每个状态字段添加注释,说明其用途、更新方式和生命周期。这在团队协作和长期维护中尤为重要。

利用类型检查

虽然 TypedDict 的运行时检查较弱,但可以配合 mypy 等静态类型检查工具,在开发阶段捕获类型错误。对于关键应用,也可以考虑使用 Pydantic 进行运行时验证。

总结

状态管理是 LangGraph 的核心能力,它决定了图的灵活性、可维护性和可靠性。通过 TypedDict 定义模式,使用 Reducer 控制更新行为,借助 MessagesState 处理消息历史,以及通过输入输出分离保护接口,我们可以构建出既强大又易于管理的代理系统。

理解这些概念后,就能更好地设计图的结构,避免常见的状态冲突问题,并为复杂的多代理系统打下坚实基础。下一章将深入探讨节点与边系统,看看如何将这些状态管理技巧应用到实际的工作流设计中。