第7章 Graph 搭建旅游规划工作流,像拼积木一样轻松
7-1 以工作流方式编排旅行规划的团队协同
旅游规划场景的多Agent 协同工作流
根据各个智能体的组合排列和业务分析,选择出的一条工作流程

7-2 搭建工作流的记忆中枢:OverAllIState
链式工作流
全局变量:用户Prompt -> 节点1 -> 节点2 -> 大模型输出
工作流的依赖包
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-graph-core</artifactId>
</dependency>
<!-- Ai Agent 工作流需要添加gson的依赖 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>工作流涉及的对象:
- OverAllState : 全局状态对象,任何节点都能够读取和修改全局状态是贯穿整个工作流
- StateGraph: 工作流框架对象,工作流框架可以添加节点以及节点之间的箭头连线
- CompiledGraph: 工作流的编译对象
工作流配置类 中创建配置链式工作流:在在SpringAi1.0中, 设置状态(State)合并策略
OverAllStateFactory是一个接口,并且是一个函数式接口 (只有一个接口方法)
lambda的 =()-> 表达式可以方便的实现函数式接口
全局状态OverAllState以Key-Value的形式存放上下文数据
- KeyStrategy不只是指代key所对应的Value,还包括Value的更新策略:
- 有2种更新策略:
- AppendStrategy:在旧值的基础上追加新值
- ReplaceStrategy:将新值覆盖旧值
- 更新策略是限定了节点对于OverAllState的Key所对应Value的操作
@Configuration
public class GraphConfiguration {
//将链式工作流配置方法设置为Bean
@Bean(name="parallelGraph")
//配置链式工作流
public StateGraph parallelGraph() throws GraphStateException {
OverAllStateFactory overAllStateFactory =()-> {
//创建全局状态
OverAllState overAllState = new OverAllState();
Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
//添加key以及Key所对应Value的更新策略
map.put("key1", new ReplaceStrategy());
map.put("key2", new AppendStrategy());
//将上下文数据以Key-Value的形式注册到全局状态
overAllState.registerKeyAndStrategy(map);
return overAllState;
};
}
}7-3 搭建工作流的框架蓝图:StateGraph
StateGraph的创建需要3个参数:
- 工作流框架的名称
- 全局状态对象的工厂类
- Json框架:采用的是Gson,因为已经自动创建出来,所以不需要传入这个参数
StateGraph stateGraph = new StateGraph("parallel",overAllStateFactory);
//添加节点
stateGraph.addNode("node1",xx);
//添加节点间的箭头连线
stateGraph.addEdge("node1",xx);7-4 搭建工作流的执行单元:NodeAction
添加节点,按道理是应该添加一个节点对象Node,采用addNode(String,Node),但是,addNode(String,Node)是被addNode(String id, AsyncNodeActionWithConfig actionWithConfig) 所调用,addNode(String id, AsyncNodeActionWithConfig actionWithConfig)又被addNode(String id, AsyncNodeAction action)所调用。所以,我们直接调用addNode(String id, AsyncNodeAction action)就可以了
AsyncNodeAction的实例生成是采用AsyncNodeAction.node_async()。AsyncNodeAction:基于异步的节点操作node_async()需要传入NodeAction。NodeAction:是一个函数式接口
//添加节点, 需要传入2个参数(1.节点id,2.AsyncNodeAction)
stateGraph.addNode("node1",AsyncNodeAction.node_async(new Node1()));
stateGraph.addNode("node2",AsyncNodeAction.node_async(new Node2()));添加节点
public class Node1 implements NodeAction {
/**
* author: Imooc
* description: 节点对于全局状态的操作
* @param state:
* @return java.util.Map<java.lang.String,java.lang.Object>
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
//根据key获取全局状态中的值
Object object = state.value("key1");
Map<String, Object> map = new HashMap<String, Object>();
map.put("key1",object);
return map;
}
}7-5 搭建工作流的流程顺序:Edge
//添加节点间的箭头连线
stateGraph.addEdge("node1","node2");
stateGraph.addEdge(stateGraph.START,"node1");
stateGraph.addEdge("node2",stateGraph.END)7-6 编译及运行工作流 CompiledGraph
CompiledGraph提供了2种运行工作流的方式:
- invoke(): 以阻塞的方式运行工作流,工作流的结果是一次性返回
- stream(); 以非阻塞,流式的方式运行工作流,执行一个节点,就返回这个节点完成的结果~
以@Qualifier注入指定名称的Bean
/**
* description: 工作流控制器
*/
@RestController
public class WorkFlowController {
//工作流编译后对象
private final CompiledGraph compiledGraph;
public WorkFlowController(@Qualifier("parallelGraph") StateGraph graph) throws GraphStateException {
this.compiledGraph = graph.compile();
}
/**
* description: 链式工作流的接口
*/
@GetMapping(value = "/workflow/parallel")
public void parallelWorkFlow() {
String prompt = "在咖啡馆里,想要杯星巴克";
Map<String, Object> map = new HashMap<String, Object>();
map.put("prompt", "prompt");
compiledGraph.invoke(map);
}
}7-7 图形化展示工作流
需要在IDEAML插件: PzzantuML Integration
在代码中以PlantUML格式打印工作流图形,添加打印
StateGraph stateGraph = new StateGraph("parallel",overAllStateFactory);
//添加节点, 需要传入2个参数(1.节点id,2.AsyncNodeAction)
stateGraph.addNode("node1",AsyncNodeAction.node_async(new Node1()));
stateGraph.addNode("node2",AsyncNodeAction.node_async(new Node2()));
//添加节点间的箭头连线
stateGraph.addEdge("node1","node2");
stateGraph.addEdge(stateGraph.START,"node1");
stateGraph.addEdge("node2",stateGraph.END);
// 以PlantUML格式打印工作流图形,添加打印
GraphRepresentation graph =
stateGraph.getGraph(GraphRepresentation.Type.PLANTUML);
System.out.println("##########################");
System.out.println(graph.content());
System.out.println("##########################");
return stateGraph;同时需要添加依赖包
<!-- Ai Agent 工作流需要添加gson的依赖 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>如何使用图形插件:
- 先安装插件之后,再右键文件New,选择PlantUML File,在选择Class模板命名parallel之后,生成一个
parallel.puml文件,此时就有图形展示 - 将日志中打印的工作流图形内容复制到.puml文件中,删除其中的
circle start<<input>> as __START__,此时就是图形
7-8 Flux就是装载SSE流式数据的容器

和大模型以流式响应的方式进行对话的前提条件:
- SpringBoot Api 和 大模型应该是保持长连接
- 大模型能够主动的向SpringBoot Api发送数据
SpringAi 提供了 SSE协议 能方便实现和大模型以流式响应的方式进行对话:
SSE 协议
- 基于HTTP的长连接技术
- 客户端发送普通HTTP请求建立SSE长连接,服务端以流式数据向客户端进行推送
- 客户端在请求头设置Accept: text/event-stream,告诉服务端需要建立SSE长连接
- 单向通信,SSE长连接建立之后,客户端只负责消息的接收,客户端不能发送消息给服务端,服务端只负责消息的推送,
Flux技术:Java专门处理异步,流式的数据序列,数据流的容器,
- 接收Ai发送过来的逐字内容
- 逐段的发送数据
Flux类是Java响应式编程核心类, WebFlux 引入了 Flux类,WebFlux也是响应式编程。
- 响应式编程: 不会让线程傻傻的等待请求的处理, 而是对系统说,我这个线程先去做其他的东西,你把请求处理的准备工作处理好,告诉我这个线程,这个线程就会回来处理这个请求。目标:用最少得线程,去处理大量的请求工作
- SpringBoot基于响应式编程模型 处理SSE数据流,因为SSE“持续的数据流”可以看做是响应式的事件流,响应式编程框架,提供了能够高效处理响应式的事件流的能力。所以,SpringBoot基于响应式编程框架,也就是WebFlux框架,和SSE协议,是天然契合~
@GetMapping(value = "/simple/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat() {
//构建prompt -> 发送到大模型 -> 获取大模型返回
//用户输入
String message = "在咖啡馆里,想要杯星巴克";
//链式调用
return this.chatClient
.prompt(message) //构建prompt
//.call() //发送到大模型
.stream() //以流式响应的方式和大模型进行交互
.content(); //获取大模型文本返回
}7-9 Flux包装SSE返回给前端展示打字机输出效果
基于SSE协议持续的发消息:
- 指定消息的MIME类型是 SSE事件类型 (text/event-stream)
- 消息要符合 SSE协议消息格式
- event: 指定事件名称
- data: 消息体
- id: 消息ID
- 通过2个换行符 ( \n\n )表示一个完整事件消息结束
- 能持续不断地发送SSE协议格式的事件消息
/**
* description: 基于SSE协议持续的发消息给前端页面
*/
@GetMapping(value = "/flux/interval",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sseToFrontPage() {
return Flux.interval(Duration.ofSeconds(1)) //每1秒发送SSE协议消息
//将Map结构组装SSE协议格式消息,
//.map(seq->"{data:{},event:{},id:{}}\\n\\n") ;
//通过 ServerSentEvent 组装 SSE协议格式
.map(seq->ServerSentEvent
.<String>builder()
.event("")
.data("")
.id("")
.build());
}
7-10 SpringAi Alibaba1.1的Graph引擎
1.1的Graph引擎与1.0的完全通用
7-11 SpringAi 1.1的工作流状态更新
1.1版本依赖一样,只是版本号更新
<!-- SpringAi Alibaba 的版本号 -->
<spring-ai-alibaba.version>1.1.0.0-RC2</spring-ai-alibaba.version>
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-graph-core</artifactId>
</dependency>- SpringAi Alibaba 1.0,通过 OverAllStateFactory 设置状态(State)合并策略。设置完成后,要手动将合并策略注册到全局状态 OverAllState中
- SpringAi Alibaba 1.1通过 KeyStrategyFactory 设置状态(State)合并策略。设置完成后,无需手动将合并策略注册到全局状态 OverAllState中
//设置全局状态的合并策略
KeyStrategyFactory keyStrategyFactory =()-> {
Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
//添加key以及Key所对应Value的更新策略
map.put("key1", new ReplaceStrategy());
map.put("key2", new AppendStrategy());
return map;
};
StateGraph stateGraph = new StateGraph( "parallel",
//状态(State)合并策略
keyStrategyFactory);7-12 基于Graph搭建旅游规划工作流(案例)
旅游规划场景的多Agent协同工作流

构建节点
/**
* description: 汇总节点
*/
public class AggregationNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
return null;
}
}
/**
* description: 费用统筹节点
*/
public class BudgetNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
return null;
}
}
/**
* description: 路线制定节点
*/
public class RouteMakingNode implements NodeAction {
/**
* description: 节点对于全局状态的操作
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
//根据key获取全局状态中的值
Object object = state.value("key1");
Map<String, Object> map = new HashMap<String, Object>();
map.put("key1",object);
return map;
}
}
/**
* description: 任务分发节点
*/
public class TaskAssignmentNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
return null;
}
}
/**
* description: 行程规划节点
*/
public class TripPlannerNode implements NodeAction {
/**
* description: 节点对于全局状态的操作
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
//根据key获取全局状态中的值
Object object = state.value("key1");
Map<String, Object> map = new HashMap<String, Object>();
map.put("key1",object);
return map;
}
}构建箭头
/**
* description: 总费用的条件判断Edge
*/
public class TotalBudgetEdge implements AsyncEdgeAction {
//跳转到下一个节点 CompletableFuture<下一个节点的ID>
@Override
public CompletableFuture<String> apply(OverAllState state) {
//取出全局状态的总费用
Optional<Integer> toalBudget = state.value("TotalBudget",Integer.class);
//判断总费用是否大于1000
if(toalBudget.get() < 1000) {
//跳到汇总节点
return CompletableFuture.completedFuture("AggregationNode");
}else {
//跳到行程规划节点
return CompletableFuture.completedFuture("TripPlannerNode");
}
}
}旅游规划工作流
/**
* description: 工作流配置类
*/
@Configuration
public class GraphConfiguration {
//将链式工作流配置方法设置为Bean
@Bean(name="parallelGraph")
//配置链式工作流
public StateGraph parallelGraph() throws GraphStateException {
//设置全局状态的合并策略
KeyStrategyFactory keyStrategyFactory =()-> {
Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
//总体费用更新策略为覆盖
map.put("TotalBudget", new ReplaceStrategy());
return map;
};
StateGraph stateGraph = new StateGraph("parallel", keyStrategyFactory);
//路线制定节点
stateGraph.addNode("RouteMakingNode",AsyncNodeAction.node_async(new RouteMakingNode()));
//行程规划节点
stateGraph.addNode("TripPlannerNode",AsyncNodeAction.node_async(new TripPlannerNode()));
//费用统筹节点
stateGraph.addNode("BudgetNode",AsyncNodeAction.node_async(new BudgetNode()));
//汇总节点
stateGraph.addNode("AggregationNode",AsyncNodeAction.node_async(new AggregationNode()));
//任务分发节点
stateGraph.addNode("TaskAssignmentNode",AsyncNodeAction.node_async(new TaskAssignmentNode()));
//添加节点间的箭头连线
//工作流开始
stateGraph.addEdge(stateGraph.START,"TaskAssignmentNode");
//工作流结束
stateGraph.addEdge("AggregationNode",stateGraph.END);
//汇总费用统筹节点
stateGraph.addEdge("RouteMakingNode","BudgetNode");
stateGraph.addEdge("TripPlannerNode","BudgetNode");
//添加总费用的条件判断
stateGraph
.addEdge("BudgetNode","TotalBudgetEdge")
.addConditionalEdges(
"TotalBudgetEdge",
//进行条件判断,然后跳转到下一个节点
new TotalBudgetEdge(),
//条件判断后跳转到的节点集合
Map.of("AggregationNode","TripPlannerNode")
);
//以PlantUML格式打印工作流图形
GraphRepresentation graph =
stateGraph.getGraph(GraphRepresentation.Type.PLANTUML);
System.out.println("##########################");
System.out.println(graph.content());
System.out.println("##########################");
return stateGraph;
}
}7-13 总结Agent和Graph分别构建的工作流

FlowAgent 基类:
- ParallelAgent 并行执行
- LImRoutingAgent 路由分发
- SequentialAgent 顺序执行