在上文,实现了一个简单的调用输出,使用的是 call
方法,此方法会等待结果的完整返回,所以耗时会比较高一点。
DEBUG 也可以看到结果是一起返回的。
但通常我们使用 AI 对话时结果是一个字一个字或一段一段蹦出来的,这里用的就是流式输出。
flux#
仔细看官方文档会发现其实是有流式输出的代码的
@GetMapping("/ai/generateStream")
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.stream(prompt);
}
不过很多人一看返回 Flux 就懵逼了,什么玩意?其实我也不知道🤔,我只听过 webflux 响应式编程,至于什么是 flux 什么是响应式也是一头雾水。
不过问过群友后,大家都说这玩意没必要学,很少用。
直接上代码
/**
* spring ai 官方的流式对话接口 使用 webflux
* @param message prompt
* @return Flux<String>
*/
@GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatSse(@PathVariable String message) {
Prompt prompt = getPrompt(message);
return chatClient.stream(prompt)
.filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
.flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
.doOnNext(System.out::println)
.doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
.doOnComplete(() -> System.out.println("complete~!"));
}
这里可以看到调用的方法是 stream
首先,第一个 filter
表示过滤掉返回的 null,这是因为流式返回的最后一个字段是 null,表示结尾。flatMap
中有个 Flux.just()
表示把返回的内容放到 Flux 里面。所以第一步要过滤掉 null,不然这一步会报错的。
SSE#
其实最开始想到的解决方案就是 SSE (Server Sent Events – 服务端主动推送),说起主动推送第一个想到的就是它,原生的,不用加其它依赖。
代码也很简单
/**
* 流式对话接口
*
* @param message prompt
* @return SseEmitter
*/
@GetMapping("stream")
public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
Flux<String> stream = chatClient.stream(message);
stream.subscribe(it -> {
try {
System.out.println(it);
emitter.send(it, MediaType.TEXT_EVENT_STREAM);
} catch (IOException e) {
System.out.println("sse发送消息失败");
emitter.completeWithError(e);
}
});
stream.doOnError(e -> {
System.out.println("流式对话发生异常");
emitter.completeWithError(e);
});
stream.doOnComplete(emitter::complete);
return emitter;
}
结果如下