3. 节点与边系统

3.节点与边系统

在 LangGraph 中,节点和边构成了整个工作流的骨架。如果把一个智能代理比作工厂里的生产线,节点就是各个工位上的机器,负责具体的加工操作;边则是传送带,决定产品从一个工位流向哪个下一个工位。这种设计让复杂的工作流变得直观且易于管理。

节点创建与引用方法

节点的本质

节点本质上是一个函数,接收当前状态作为输入,执行特定逻辑后返回更新后的状态。这个函数可以是普通 Python 函数,也可以是异步函数,甚至可以是另一个完整的图(子图)。节点的设计非常灵活,不强制要求特定的签名格式,但最常用的是接收 state 参数的模式。

from typing_extensions import TypedDict
from langgraph.graph import StateGraph

class State(TypedDict):
    text: str
    count: int

def simple_node(state: State) -> dict:
    # 接收整个状态,返回需要更新的字段
    return {"text": state["text"] + " processed", "count": state["count"] + 1}

# 创建图并添加节点
builder = StateGraph(State)
builder.add_node("processor", simple_node)

上面的代码展示了一个基本节点。注意节点函数返回的是字典,只包含需要更新的状态字段,不需要返回整个状态对象。LangGraph 会自动将这个更新合并到整体状态中。

节点函数还可以接收更多参数来获取运行时信息:

from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime

def advanced_node(state: State, config: RunnableConfig, runtime: Runtime):
    # config 包含线程ID、标签等配置信息
    thread_id = config["configurable"]["thread_id"]
    
    # runtime 提供上下文、存储等运行时能力
    context = runtime.context  # 如果有定义上下文的话
    
    return {"text": f"Processed in thread {thread_id}"}

builder.add_node("advanced", advanced_node)

节点的添加方式

添加节点主要有两种方式:显式命名和自动命名。

# 方式一:显式指定节点名称
builder.add_node("my_node", simple_node)

# 方式二:使用函数名作为节点名称
builder.add_node(simple_node)  # 节点名称为 "simple_node"

第二种方式很方便,但需要注意函数名可能会冲突。在大型项目中,建议始终使用显式命名,这样代码的可读性和可维护性更好。

特殊节点:START 和 END

LangGraph 提供了两个特殊节点:STARTEND。它们不是真正的函数节点,而是标记图的起始和终止位置的虚拟节点。

from langgraph.graph import START, END

# 定义从起点到第一个节点的边
builder.add_edge(START, "processor")

# 定义从最后一个节点到终点的边
builder.add_edge("processor", END)

START 节点代表用户输入进入图的位置,所有从 START 出发的边决定了图的入口点。END 节点表示图执行结束的位置,指向 END 的边告诉 LangGraph 在这里停止执行。

节点缓存机制

对于计算成本高的节点,LangGraph 提供了缓存功能。这在调用外部 API 或执行复杂计算时特别有用。

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

def expensive_node(state: State) -> dict:
    time.sleep(2)  # 模拟耗时操作
    return {"result": state["input"] * 2}

# 为节点添加缓存策略,TTL 为 120 秒
builder.add_node("expensive", expensive_node, cache_policy=CachePolicy(ttl=120))

# 编译图时启用缓存
graph = builder.compile(cache=InMemoryCache())

# 第一次调用会执行耗时操作
result1 = graph.invoke({"input": 5})
# 第二次调用会命中缓存,立即返回
result2 = graph.invoke({"input": 5})  # 这次会快很多

缓存键默认基于节点输入生成,也可以通过 key_func 参数自定义。缓存可以显著减少重复计算,但需要注意缓存失效策略,避免返回过时的结果。

边类型与连接方式

普通边:最简单的连接

普通边表示确定的、无条件的流转路径。如果节点 A 执行完后总是要到节点 B,就用普通边连接。

# 从 node_a 到 node_b 的普通边
builder.add_edge("node_a", "node_b")

普通边的特点是:一旦定义,流转路径就固定了,不依赖任何运行时状态。这适合线性流程或确定性的分支流程。

条件边:动态路由逻辑

条件边允许根据运行时状态动态决定下一个节点。这在构建智能代理时非常有用,比如根据 LLM 的输出决定调用哪个工具。

def routing_function(state: State) -> str:
    # 根据状态决定下一个节点
    if state["condition"] == "A":
        return "node_b"
    else:
        return "node_c"

# 添加条件边
builder.add_conditional_edges("node_a", routing_function)

条件边的路由函数返回目标节点的名称。如果返回字符串列表,所有对应的节点会并行执行。

def multi_route(state: State) -> list[str]:
    # 返回多个节点名称,它们会并行执行
    return ["node_b", "node_c", "node_d"]

builder.add_conditional_edges("node_a", multi_route)

入口点配置

入口点决定图从哪个节点开始执行。最简单的入口点配置是从 START 节点到第一个节点的普通边。

builder.add_edge(START, "first_node")

一个图可以有多个入口点吗?实际上,虽然可以定义多条从 START 出发的边,但通常只定义一条主入口边。更灵活的入口控制通过条件入口点实现。

条件入口点:动态选择起点

条件入口点允许根据输入状态动态选择从哪个节点开始执行。

def entry_routing(state: State) -> str:
    # 根据输入决定从哪个节点开始
    if state["user_type"] == "admin":
        return "admin_node"
    else:
        return "normal_node"

# 从 START 添加条件边
builder.add_conditional_edges(START, entry_routing)

这在处理不同类型的请求时很有用,比如管理员请求走特殊处理流程,普通用户走标准流程。

条件边路由逻辑

路由函数的实现细节

路由函数是条件边的核心。它的签名与节点函数类似,接收状态作为输入,但返回值不同:返回下一个节点的名称或名称列表。

def simple_router(state: State) -> Literal["option1", "option2", "option3"]:
    # 使用 Literal 类型明确可能的返回值
    score = state["score"]
    if score > 90:
        return "option1"
    elif score > 60:
        return "option2"
    else:
        return "option3"

builder.add_conditional_edges("evaluator", simple_router)

使用 Literal 类型注解有两个好处:一是让代码自文档化,明确路由的可能结果;二是让 LangGraph Studio 能够正确可视化图结构。

并行路由:一次到多个节点

当路由函数返回节点列表时,这些节点会在同一个超级步(super-step)中并行执行。这是 LangGraph 实现并行计算的主要方式。

def parallel_router(state: State) -> list[str]:
    # 根据任务列表动态生成目标节点
    tasks = state["tasks"]
    return [f"process_{task}" for task in tasks]

# 假设有三个处理节点
builder.add_node("process_email", process_email)
builder.add_node("process_slack", process_slack)
builder.add_node("process_ticket", process_ticket)

# 条件边可能返回其中任意组合
builder.add_conditional_edges("dispatcher", parallel_router)

并行执行完成后,所有节点的更新会通过 reducer 合并到主状态中。这要求状态字段定义了合适的 reducer 函数。

路径映射:显式定义路由表

有时路由函数的返回值不是直接的节点名称,而是某种逻辑标识。这时可以使用路径映射将标识转换为实际节点名称。

def status_router(state: State) -> str:
    # 返回状态码而非节点名
    return state["status_code"]  # 可能是 "success", "retry", "fail"

# 使用路径映射将状态码映射到节点
builder.add_conditional_edges(
    "checker",
    status_router,
    {
        "success": "complete_node",
        "retry": "retry_node",
        "fail": "error_node"
    }
)

路径映射让路由逻辑与节点名称解耦,提高了代码的可维护性。当节点名称变化时,只需修改映射表,不需要改动路由函数。

终止条件:路由到 END

路由函数可以返回 END 来终止图执行。这在满足特定条件时提前结束流程很有用。

def early_exit_router(state: State) -> str:
    if state["should_continue"] is False:
        return END  # 终止执行
    return "next_node"

builder.add_conditional_edges("checker", early_exit_router)

注意 END 是一个特殊常量,需要从 langgraph.graph 导入。

入口出口点配置

显式配置入口点

虽然可以从 START 添加边来定义入口点,但 LangGraph 也提供了更明确的配置方法。

# 方法1:使用 add_edge
builder.add_edge(START, "first_node")

# 方法2:使用 set_entry_point
builder.set_entry_point("first_node")

两种方法效果相同,但 set_entry_point 语义更明确,在复杂图中更容易识别入口。

显式配置出口点

类似地,出口点也有显式配置方式。

# 方法1:使用 add_edge 到 END
builder.add_edge("last_node", END)

# 方法2:使用 set_finish_point
builder.set_finish_point("last_node")

在简单图中,显式配置出口点可能显得多余。但在包含多个分支的复杂图中,明确指定哪些节点是终止节点能大大提高图的可读性。

多入口多出口场景

某些复杂工作流可能需要多个入口或出口。例如,一个数据处理管道可能根据数据类型从不同节点开始,或在不同条件下在不同节点结束。

# 多入口:条件入口点
def entry_point_selector(state: State) -> str:
    data_type = state["data_type"]
    if data_type == "image":
        return "image_processor"
    elif data_type == "text":
        return "text_processor"
    else:
        return "generic_processor"

builder.add_conditional_edges(START, entry_point_selector)

# 多出口:多个节点指向 END
builder.add_edge("success_node", END)
builder.add_edge("error_node", END)
builder.add_edge("partial_node", END)

这种设计让图能够处理更复杂的业务逻辑,同时保持结构的清晰性。

Send API 并行任务分发

Send API 解决的问题

默认情况下,图中的节点和边在编译时就确定了,所有节点共享同一个状态。但有些场景需要动态创建边,或者让不同节点拥有独立的状态副本。典型的例子是 map-reduce 模式:一个节点生成对象列表,另一个节点需要对每个对象单独处理。

from langgraph.types import Send

def map_step(state: OverallState):
    # 生成多个主题
    return {"subjects": ["math", "science", "history"]}

def reduce_step(state: OverallState):
    # 收集所有结果
    all_jokes = state["jokes"]
    return {"best_joke": max(all_jokes, key=len)}

# 关键:为每个主题动态创建边
def continue_to_jokes(state: OverallState):
    # 返回 Send 对象列表,每个对象指定目标节点和独立状态
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

builder.add_node("map_step", map_step)
builder.add_node("generate_joke", generate_joke)
builder.add_node("reduce_step", reduce_step)

builder.add_edge(START, "map_step")
builder.add_conditional_edges("map_step", continue_to_jokes)
builder.add_edge("generate_joke", "reduce_step")
builder.add_edge("reduce_step", END)

Send 对象的工作原理

Send 接收两个参数:目标节点名称和要传递给该节点的状态。当条件边返回 Send 对象列表时,LangGraph 会为每个 Send 创建独立的执行分支,每个分支拥有自己独立的状态副本。

def continue_to_jokes(state: OverallState):
    # 为每个主题创建一个独立的 generate_joke 任务
    return [
        Send("generate_joke", {"subject": "math"}),
        Send("generate_joke", {"subject": "science"}),
        Send("generate_joke", {"subject": "history"})
    ]

这些分支会并行执行,所有结果最终通过 reducer 合并到主状态的 jokes 字段中。这要求 jokes 字段定义了合适的 reducer,比如 operator.add

实际应用场景

Send API 在以下场景特别有用:

  1. 批量处理:对列表中的每个项目执行相同操作
  2. 并行搜索:同时调用多个搜索工具
  3. 分片计算:将大数据集分片并行处理
  4. 多代理协作:动态创建子代理处理不同任务
def parallel_search(state: State):
    queries = state["search_queries"]
    return [Send("search_tool", {"query": q}) for q in queries]

def search_tool(state: State):
    query = state["query"]
    result = perform_search(query)
    return {"results": [result]}

# results 字段使用 operator.add 作为 reducer
class State(TypedDict):
    search_queries: list[str]
    results: Annotated[list, operator.add]

与条件边的结合

Send API 通常与条件边一起使用,因为需要动态决定创建多少个分支。

def dynamic_branching(state: State):
    items = state["items"]
    if len(items) > 10:
        # 项目太多,分批处理
        return [Send("batch_processor", {"batch": items[i:i+5]}) 
                for i in range(0, len(items), 5)]
    else:
        # 项目少,直接处理
        return [Send("single_processor", {"item": item}) for item in items]

builder.add_conditional_edges("splitter", dynamic_branching)

这种模式让图能够根据数据规模自适应调整处理策略,既保证了效率,又避免了不必要的开销。

总结

节点和边系统是 LangGraph 最基础也最强大的部分。节点封装了具体的业务逻辑,边控制执行流程。普通边提供确定性路径,条件边实现动态路由,Send API 支持并行计算。合理组合这些元素,可以构建出从简单线性流程到复杂多代理系统的各种工作流。

掌握这些概念后,就能设计出结构清晰、易于维护的智能代理。下一章将介绍如何利用这些基础构建 ReAct 代理,实现工具调用和推理的循环。