Skip to content

第7章 Graph 搭建旅游规划工作流,像拼积木一样轻松

7-1 以工作流方式编排旅行规划的团队协同

旅游规划场景的多Agent 协同工作流

根据各个智能体的组合排列和业务分析,选择出的一条工作流程

7-2 搭建工作流的记忆中枢:OverAllIState

链式工作流

全局变量:用户Prompt -> 节点1 -> 节点2 -> 大模型输出

工作流的依赖包

xml
<dependency>
    <groupId>com.alibaba.cloud.ai</groupId>
    <artifactId>spring-ai-alibaba-graph-core</artifactId>
</dependency>
<!-- Ai Agent 工作流需要添加gson的依赖      -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

工作流涉及的对象:

  1. OverAllState : 全局状态对象,任何节点都能够读取和修改全局状态是贯穿整个工作流
  2. StateGraph: 工作流框架对象,工作流框架可以添加节点以及节点之间的箭头连线
  3. CompiledGraph: 工作流的编译对象

工作流配置类 中创建配置链式工作流:在在SpringAi1.0中, 设置状态(State)合并策略

  • OverAllStateFactory是一个接口,并且是一个函数式接口 (只有一个接口方法)

  • lambda的 =()-> 表达式可以方便的实现函数式接口

全局状态OverAllState以Key-Value的形式存放上下文数据

  • KeyStrategy不只是指代key所对应的Value,还包括Value的更新策略:
  • 有2种更新策略:
    1. AppendStrategy:在旧值的基础上追加新值
    1. ReplaceStrategy:将新值覆盖旧值
  • 更新策略是限定了节点对于OverAllState的Key所对应Value的操作
java
@Configuration
public class GraphConfiguration {

    //将链式工作流配置方法设置为Bean
    @Bean(name="parallelGraph")
    //配置链式工作流
    public StateGraph parallelGraph() throws GraphStateException {
        OverAllStateFactory overAllStateFactory =()-> {

            //创建全局状态
            OverAllState overAllState = new OverAllState();
            Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
            //添加key以及Key所对应Value的更新策略
            map.put("key1", new ReplaceStrategy());
            map.put("key2", new AppendStrategy());

            //将上下文数据以Key-Value的形式注册到全局状态
            overAllState.registerKeyAndStrategy(map);
            return overAllState;
        };
    }     
 }

7-3 搭建工作流的框架蓝图:StateGraph

StateGraph的创建需要3个参数:

  1. 工作流框架的名称
  2. 全局状态对象的工厂类
  3. Json框架:采用的是Gson,因为已经自动创建出来,所以不需要传入这个参数
java
StateGraph stateGraph =  new StateGraph("parallel",overAllStateFactory);
    //添加节点
    stateGraph.addNode("node1",xx);
    //添加节点间的箭头连线
    stateGraph.addEdge("node1",xx);

7-4 搭建工作流的执行单元:NodeAction

添加节点,按道理是应该添加一个节点对象Node,采用addNode(String,Node),但是,addNode(String,Node)是被addNode(String id, AsyncNodeActionWithConfig actionWithConfig) 所调用,addNode(String id, AsyncNodeActionWithConfig actionWithConfig)又被addNode(String id, AsyncNodeAction action)所调用。所以,我们直接调用addNode(String id, AsyncNodeAction action)就可以了

​ AsyncNodeAction的实例生成是采用AsyncNodeAction.node_async()。AsyncNodeAction:基于异步的节点操作node_async()需要传入NodeAction。NodeAction:是一个函数式接口

java
//添加节点, 需要传入2个参数(1.节点id,2.AsyncNodeAction)
stateGraph.addNode("node1",AsyncNodeAction.node_async(new Node1()));
stateGraph.addNode("node2",AsyncNodeAction.node_async(new Node2()));

添加节点

java
public class Node1 implements NodeAction {
    /**
     * author: Imooc
     * description: 节点对于全局状态的操作
     * @param state:
     * @return java.util.Map<java.lang.String,java.lang.Object>
     */
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        //根据key获取全局状态中的值
        Object object = state.value("key1");
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("key1",object);
        return map;
    }
}

7-5 搭建工作流的流程顺序:Edge

java
//添加节点间的箭头连线
stateGraph.addEdge("node1","node2");
stateGraph.addEdge(stateGraph.START,"node1");
stateGraph.addEdge("node2",stateGraph.END)

7-6 编译及运行工作流 CompiledGraph

CompiledGraph提供了2种运行工作流的方式:

  1. invoke(): 以阻塞的方式运行工作流,工作流的结果是一次性返回
  2. stream(); 以非阻塞,流式的方式运行工作流,执行一个节点,就返回这个节点完成的结果~

以@Qualifier注入指定名称的Bean

java
/**
 * description: 工作流控制器
 */
@RestController
public class WorkFlowController {
    //工作流编译后对象
    private final CompiledGraph compiledGraph;
    
    public WorkFlowController(@Qualifier("parallelGraph") StateGraph graph) throws GraphStateException {
        this.compiledGraph = graph.compile();
    }
    /**
     * description: 链式工作流的接口
     */
    @GetMapping(value = "/workflow/parallel")
    public void parallelWorkFlow() {
        String prompt = "在咖啡馆里,想要杯星巴克";
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("prompt", "prompt");
        compiledGraph.invoke(map);
    }
}

7-7 图形化展示工作流

需要在IDEAML插件: PzzantuML Integration

在代码中以PlantUML格式打印工作流图形,添加打印

java
StateGraph stateGraph =  new StateGraph("parallel",overAllStateFactory);
    //添加节点, 需要传入2个参数(1.节点id,2.AsyncNodeAction)
    stateGraph.addNode("node1",AsyncNodeAction.node_async(new Node1()));
    stateGraph.addNode("node2",AsyncNodeAction.node_async(new Node2()));
    //添加节点间的箭头连线
    stateGraph.addEdge("node1","node2");
    stateGraph.addEdge(stateGraph.START,"node1");
    stateGraph.addEdge("node2",stateGraph.END);
    // 以PlantUML格式打印工作流图形,添加打印
    GraphRepresentation graph =
    stateGraph.getGraph(GraphRepresentation.Type.PLANTUML);
    System.out.println("##########################");
    System.out.println(graph.content());
    System.out.println("##########################");
    return stateGraph;

同时需要添加依赖包

xml
<!-- Ai Agent 工作流需要添加gson的依赖      -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

如何使用图形插件:

  1. 先安装插件之后,再右键文件New,选择PlantUML File,在选择Class模板命名parallel之后,生成一个parallel.puml文件,此时就有图形展示
  2. 将日志中打印的工作流图形内容复制到.puml文件中,删除其中的circle start<<input>> as __START__,此时就是图形

7-8 Flux就是装载SSE流式数据的容器

和大模型以流式响应的方式进行对话的前提条件:

  1. SpringBoot Api 和 大模型应该是保持长连接
  2. 大模型能够主动的向SpringBoot Api发送数据

SpringAi 提供了 SSE协议 能方便实现和大模型以流式响应的方式进行对话:

SSE 协议

  1. 基于HTTP的长连接技术
  2. 客户端发送普通HTTP请求建立SSE长连接,服务端以流式数据向客户端进行推送
  3. 客户端在请求头设置Accept: text/event-stream,告诉服务端需要建立SSE长连接
  4. 单向通信,SSE长连接建立之后,客户端只负责消息的接收,客户端不能发送消息给服务端,服务端只负责消息的推送,

Flux技术:Java专门处理异步,流式的数据序列,数据流的容器,

  1. 接收Ai发送过来的逐字内容
  2. 逐段的发送数据

Flux类是Java响应式编程核心类, WebFlux 引入了 Flux类,WebFlux也是响应式编程。

  • 响应式编程: 不会让线程傻傻的等待请求的处理, 而是对系统说,我这个线程先去做其他的东西,你把请求处理的准备工作处理好,告诉我这个线程,这个线程就会回来处理这个请求。目标:用最少得线程,去处理大量的请求工作
  • SpringBoot基于响应式编程模型 处理SSE数据流,因为SSE“持续的数据流”可以看做是响应式的事件流,响应式编程框架,提供了能够高效处理响应式的事件流的能力。所以,SpringBoot基于响应式编程框架,也就是WebFlux框架,和SSE协议,是天然契合~
java
@GetMapping(value = "/simple/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat() {
    //构建prompt -> 发送到大模型 -> 获取大模型返回
    //用户输入
    String message = "在咖啡馆里,想要杯星巴克";
    //链式调用
    return this.chatClient
            .prompt(message) //构建prompt
            //.call()  //发送到大模型
            .stream() //以流式响应的方式和大模型进行交互
            .content(); //获取大模型文本返回
}

7-9 Flux包装SSE返回给前端展示打字机输出效果

基于SSE协议持续的发消息:

  1. 指定消息的MIME类型是 SSE事件类型 (text/event-stream)
  2. 消息要符合 SSE协议消息格式
  • event: 指定事件名称
  • data: 消息体
  • id: 消息ID
  • 通过2个换行符 ( \n\n )表示一个完整事件消息结束
  1. 能持续不断地发送SSE协议格式的事件消息
java
/**
 * description: 基于SSE协议持续的发消息给前端页面
 */
@GetMapping(value = "/flux/interval",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sseToFrontPage() {
    return Flux.interval(Duration.ofSeconds(1)) //每1秒发送SSE协议消息
            //将Map结构组装SSE协议格式消息,
            //.map(seq->"{data:{},event:{},id:{}}\\n\\n")  ;
            //通过 ServerSentEvent 组装 SSE协议格式
            .map(seq->ServerSentEvent
                .<String>builder()
                .event("")
                .data("")
                .id("")
                .build());
}

7-10 SpringAi Alibaba1.1的Graph引擎

1.1的Graph引擎与1.0的完全通用

7-11 SpringAi 1.1的工作流状态更新

1.1版本依赖一样,只是版本号更新

xml
<!--  SpringAi Alibaba 的版本号      -->
<spring-ai-alibaba.version>1.1.0.0-RC2</spring-ai-alibaba.version>
<dependency>
    <groupId>com.alibaba.cloud.ai</groupId>
    <artifactId>spring-ai-alibaba-graph-core</artifactId>
</dependency>
  • SpringAi Alibaba 1.0,通过 OverAllStateFactory 设置状态(State)合并策略。设置完成后,要手动将合并策略注册到全局状态 OverAllState中
  • SpringAi Alibaba 1.1通过 KeyStrategyFactory 设置状态(State)合并策略。设置完成后,无需手动将合并策略注册到全局状态 OverAllState中
java
//设置全局状态的合并策略
KeyStrategyFactory keyStrategyFactory =()-> {
    Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
    //添加key以及Key所对应Value的更新策略
    map.put("key1", new ReplaceStrategy());
    map.put("key2", new AppendStrategy());
    return map;
};
StateGraph stateGraph =  new StateGraph( "parallel",
                //状态(State)合并策略
                keyStrategyFactory);

7-12 基于Graph搭建旅游规划工作流(案例)

旅游规划场景的多Agent协同工作流

构建节点

java
/**
 * description: 汇总节点
 */
public class AggregationNode implements NodeAction {
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        return null;
    }
}

/**
 * description: 费用统筹节点
 */
public class BudgetNode implements NodeAction {
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        return null;
    }
}

/**
 * description: 路线制定节点
 */
public class RouteMakingNode implements NodeAction {
    /**
     * description: 节点对于全局状态的操作
     */
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        //根据key获取全局状态中的值
        Object object = state.value("key1");
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("key1",object);
        return map;
    }
}

/**
 * description: 任务分发节点
 */
public class TaskAssignmentNode implements NodeAction {
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        return null;
    }
}

/**
 * description: 行程规划节点
 */
public class TripPlannerNode implements NodeAction {
    /**
     * description: 节点对于全局状态的操作
     */
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        //根据key获取全局状态中的值
        Object object = state.value("key1");
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("key1",object);
        return map;
    }
}

构建箭头

java
/**
 * description: 总费用的条件判断Edge
 */
public class TotalBudgetEdge implements AsyncEdgeAction {
    //跳转到下一个节点 CompletableFuture<下一个节点的ID>
    @Override
    public CompletableFuture<String> apply(OverAllState state) {
     //取出全局状态的总费用
      Optional<Integer> toalBudget = state.value("TotalBudget",Integer.class);
      //判断总费用是否大于1000
        if(toalBudget.get() < 1000) {
            //跳到汇总节点
            return CompletableFuture.completedFuture("AggregationNode");
        }else {
            //跳到行程规划节点
            return CompletableFuture.completedFuture("TripPlannerNode");
        }
    }
}

旅游规划工作流

java
/**
 * description: 工作流配置类
 */
@Configuration
public class GraphConfiguration {
    //将链式工作流配置方法设置为Bean
    @Bean(name="parallelGraph")
    //配置链式工作流
    public StateGraph parallelGraph() throws GraphStateException {
        //设置全局状态的合并策略
        KeyStrategyFactory keyStrategyFactory =()-> {
            Map<String, KeyStrategy> map = new HashMap<String, KeyStrategy>();
            //总体费用更新策略为覆盖
            map.put("TotalBudget", new ReplaceStrategy());
            return map;
        };
        StateGraph stateGraph =  new StateGraph("parallel", keyStrategyFactory);
        //路线制定节点
        stateGraph.addNode("RouteMakingNode",AsyncNodeAction.node_async(new RouteMakingNode()));
        //行程规划节点
        stateGraph.addNode("TripPlannerNode",AsyncNodeAction.node_async(new TripPlannerNode()));
        //费用统筹节点
        stateGraph.addNode("BudgetNode",AsyncNodeAction.node_async(new BudgetNode()));
        //汇总节点
        stateGraph.addNode("AggregationNode",AsyncNodeAction.node_async(new AggregationNode()));
        //任务分发节点
        stateGraph.addNode("TaskAssignmentNode",AsyncNodeAction.node_async(new TaskAssignmentNode()));
        //添加节点间的箭头连线
        //工作流开始
        stateGraph.addEdge(stateGraph.START,"TaskAssignmentNode");
        //工作流结束
        stateGraph.addEdge("AggregationNode",stateGraph.END);
        //汇总费用统筹节点
        stateGraph.addEdge("RouteMakingNode","BudgetNode");
        stateGraph.addEdge("TripPlannerNode","BudgetNode");
        //添加总费用的条件判断
        stateGraph
                .addEdge("BudgetNode","TotalBudgetEdge")
                .addConditionalEdges(
                        "TotalBudgetEdge",
                        //进行条件判断,然后跳转到下一个节点
                        new TotalBudgetEdge(),
                        //条件判断后跳转到的节点集合
                        Map.of("AggregationNode","TripPlannerNode")
                );
        //以PlantUML格式打印工作流图形
        GraphRepresentation graph =
        stateGraph.getGraph(GraphRepresentation.Type.PLANTUML);
        System.out.println("##########################");
        System.out.println(graph.content());
        System.out.println("##########################");
        return stateGraph;
    }
}

7-13 总结Agent和Graph分别构建的工作流

FlowAgent 基类:

  • ParallelAgent 并行执行
  • LImRoutingAgent 路由分发
  • SequentialAgent 顺序执行