上記のテキストを日本語に翻訳します:
前述のように、call
メソッドを使用して単純な呼び出し出力を実現しました。このメソッドは完全な結果の返却を待つため、時間がかかります。
DEBUG でも結果が一緒に返されることがわかります。
しかし、通常、AI との対話では結果が一文字ずつまたは一部ずつ表示されます。ここではストリーム出力が使用されています。
flux#
公式ドキュメントを注意深く見ると、実際にはストリーム出力のコードがあることがわかります。
@GetMapping("/ai/generateStream")
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.stream(prompt);
}
ただし、多くの人が Flux を見ると混乱してしまいます。何なのか?実際、私もわかりません🤔、WebFlux の反応型プログラミングについては聞いたことがありますが、Flux とは何か、反応型とは何かはまったくわかりません。
ただし、グループのメンバーに尋ねた後、みんながこのものを学ぶ必要はないと言っていました。あまり使われないそうです。
コードを直接示します。
/**
* spring ai 官方の流式对话接口 使用 webflux
* @param message prompt
* @return Flux<String>
*/
@GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatSse(@PathVariable String message) {
Prompt prompt = getPrompt(message);
return chatClient.stream(prompt)
.filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
.flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
.doOnNext(System.out::println)
.doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
.doOnComplete(() -> System.out.println("complete~!"));
}
ここで呼び出されるメソッドは stream
です。
まず、最初の filter
は null をフィルタリングすることを意味します。これは、ストリームの最後のフィールドが null であり、終了を示すためです。flatMap
には Flux.just()
があり、返された内容を Flux に入れることを意味します。したがって、最初に null をフィルタリングする必要があります。そうしないと、このステップでエラーが発生します。
SSE#
実際、最初に思いついた解決策は SSE(Server Sent Events - サーバーからのイベント通知)です。アクティブプッシュについて話すときに最初に思い浮かぶのはこれです。ネイティブであり、他の依存関係を追加する必要はありません。
コードも非常にシンプルです。
/**
* 流式对话接口
*
* @param message prompt
* @return SseEmitter
*/
@GetMapping("stream")
public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
Flux<String> stream = chatClient.stream(message);
stream.subscribe(it -> {
try {
System.out.println(it);
emitter.send(it, MediaType.TEXT_EVENT_STREAM);
} catch (IOException e) {
System.out.println("sse发送消息失败");
emitter.completeWithError(e);
}
});
stream.doOnError(e -> {
System.out.println("流式对话发生异常");
emitter.completeWithError(e);
});
stream.doOnComplete(emitter::complete);
return emitter;
}
結果は以下の通りです。