qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

spring AI (二) 流式输出

在上文,实现了一个简单的调用输出,使用的是 call 方法,此方法会等待结果的完整返回,所以耗时会比较高一点。

DEBUG 也可以看到结果是一起返回的。

image

但通常我们使用 AI 对话时结果是一个字一个字或一段一段蹦出来的,这里用的就是流式输出。

flux#

仔细看官方文档会发现其实是有流式输出的代码的

image

@GetMapping("/ai/generateStream")
	public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        Prompt prompt = new Prompt(new UserMessage(message));
        return chatClient.stream(prompt);
    }

不过很多人一看返回 Flux 就懵逼了,什么玩意?其实我也不知道🤔,我只听过 webflux 响应式编程,至于什么是 flux 什么是响应式也是一头雾水。

不过问过群友后,大家都说这玩意没必要学,很少用。

直接上代码

/**
     * spring ai 官方的流式对话接口 使用 webflux
     * @param message prompt
     * @return Flux<String>
     */
    @GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> chatSse(@PathVariable String message) {
        Prompt prompt = getPrompt(message);

        return chatClient.stream(prompt)
                .filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
                .flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
                .doOnNext(System.out::println)
                .doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
                .doOnComplete(() -> System.out.println("complete~!"));
    }

这里可以看到调用的方法是 stream

首先,第一个 filter 表示过滤掉返回的 null,这是因为流式返回的最后一个字段是 null,表示结尾。flatMap 中有个 Flux.just() 表示把返回的内容放到 Flux 里面。所以第一步要过滤掉 null,不然这一步会报错的。

SSE#

其实最开始想到的解决方案就是 SSE (Server Sent Events – 服务端主动推送),说起主动推送第一个想到的就是它,原生的,不用加其它依赖。

代码也很简单

/**
     * 流式对话接口
     *
     * @param message prompt
     * @return SseEmitter
     */
    @GetMapping("stream")
    public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
        Flux<String> stream = chatClient.stream(message);
        stream.subscribe(it -> {
            try {
                System.out.println(it);
                emitter.send(it, MediaType.TEXT_EVENT_STREAM);
            } catch (IOException e) {
                System.out.println("sse发送消息失败");
                emitter.completeWithError(e);
            }
        });

        stream.doOnError(e -> {
            System.out.println("流式对话发生异常");
            emitter.completeWithError(e);
        });

        stream.doOnComplete(emitter::complete);
        return emitter;
    }

结果如下

image

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。