在上文,實現了一個簡單的調用輸出,使用的是 call
方法,此方法會等待結果的完整返回,所以耗時會比較高一點。
DEBUG 也可以看到結果是一起返回的。
但通常我們使用 AI 對話時結果是一個字一個字或一段一段蹦出來的,這裡用的就是流式輸出。
flux#
仔細看官方文檔會發現其實是有流式輸出的代碼的
@GetMapping("/ai/generateStream")
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.stream(prompt);
}
不過很多人一看返回 Flux 就懵逼了,什麼玩意?其實我也不知道🤔,我只聽過 webflux 響應式編程,至於什麼是 flux 什麼是響應式也是一頭霧水。
不過問過群友後,大家都說這玩意沒必要學,很少用。
直接上代碼
/**
* spring ai 官方的流式對話接口 使用 webflux
* @param message prompt
* @return Flux<String>
*/
@GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatSse(@PathVariable String message) {
Prompt prompt = getPrompt(message);
return chatClient.stream(prompt)
.filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
.flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
.doOnNext(System.out::println)
.doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
.doOnComplete(() -> System.out.println("complete~!"));
}
這裡可以看到調用的方法是 stream
首先,第一個 filter
表示過濾掉返回的 null,這是因為流式返回的最後一個字段是 null,表示結尾。flatMap
中有個 Flux.just()
表示把返回的內容放到 Flux 裡面。所以第一步要過濾掉 null,不然這一步會報錯的。
SSE#
其實最開始想到的解決方案就是 SSE (Server Sent Events – 服務端主動推送),說起主動推送第一個想到的就是它,原生的,不用加其他依賴。
代碼也很簡單
/**
* 流式對話接口
*
* @param message prompt
* @return SseEmitter
*/
@GetMapping("stream")
public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
Flux<String> stream = chatClient.stream(message);
stream.subscribe(it -> {
try {
System.out.println(it);
emitter.send(it, MediaType.TEXT_EVENT_STREAM);
} catch (IOException e) {
System.out.println("sse發送消息失敗");
emitter.completeWithError(e);
}
});
stream.doOnError(e -> {
System.out.println("流式對話發生異常");
emitter.completeWithError(e);
});
stream.doOnComplete(emitter::complete);
return emitter;
}
結果如下