百度文心一言 java 支持流式输出,Springboot+ sse的demo

百度文心一言 java 支持流式输出,Springboot+ sse的demo

码农世界 2024-05-21 前端 59 次浏览 0个评论

参考:GitHub - mmciel/wenxin-api-java: 百度文心一言Java库,支持问答和对话,支持流式输出和同步输出。提供SpringBoot调用样例。提供拓展能力。

1、依赖

com.baidu.aip

java-sdk

4.16.18

2、配置apikey和secretkey

3、主要使用的接口

4、返回的json格式 

3、WenxinEventSourceListener  事件监听器

和其他的接口不一样 需要 CompletionsResponse.data  封装下 ,不然前端页面需要兼容非json的格式

@Slf4j
public class WenxinEventSourceListener extends EventSourceListener {
    private long tokens;
    private SseEmitter sseEmitter;
    public WenxinEventSourceListener(SseEmitter sseEmitter) {
        this.sseEmitter = sseEmitter;
    }
    @Override
    public void onOpen(EventSource eventSource, Response response) {
        log.info("建立sse连接...");
    }
    @SneakyThrows
    @Override
    @JsonIgnoreProperties(ignoreUnknown = true)
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        ChatResponse bean = JSONUtil.parseObj(data).toBean(ChatResponse.class);
        log.info("返回数据:{}", data);
        if (bean.getIs_end()) {
            log.info("返回数据结束了");
            sseEmitter.send(SseEmitter.event()
                    .id("[TOKENS]")
                    .data("

tokens:" + tokens()) .reconnectTime(3000)); sseEmitter.send(SseEmitter.event() .id("[DONE]") .data("[DONE]") .reconnectTime(3000)); // 传输完成后自动关闭sse sseEmitter.complete(); return; } log.info("OpenAI返回数据:{}", data); tokens += 1; if (data.equals("[DONE]")) { log.info("OpenAI返回数据结束了"); sseEmitter.send(SseEmitter.event() .id("[TOKENS]") .data("

tokens:" + tokens()) .reconnectTime(3000)); sseEmitter.send(SseEmitter.event() .id("[DONE]") .data("[DONE]") .reconnectTime(3000)); // 传输完成后自动关闭sse sseEmitter.complete(); return; } CompletionsResponse completionResponse = new CompletionsResponse(); CompletionsResponse.Data dataResult = new CompletionsResponse.Data(); dataResult.setText(bean.getResult()); completionResponse.setData(dataResult); try { sseEmitter.send(SseEmitter.event() .id(bean.getId()) .data(completionResponse.getData()) .reconnectTime(3000)); } catch (Exception e) { log.error("sse信息推送失败!"); eventSource.cancel(); e.printStackTrace(); } } @Override public void onClosed(EventSource eventSource) { log.info("关闭sse连接..."); } @SneakyThrows @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { if(Objects.isNull(response)){ log.error("sse连接异常:{}", t); eventSource.cancel(); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { // 错误处理 {"error_code":110,"error_msg":"Access token invalid or no longer valid"},异常:{} log.error("sse连接异常data:{},异常:{}", body.string(), t); } else { log.error("sse连接异常data:{},异常:{}", response, t); } eventSource.cancel(); } /** * tokens * @return */ public long tokens() { return tokens; } }

4、WenXinClient  流式主要看下 streamChat 方式,之前从千帆上找到流式例子 返回type是json的,所以之前自己手写的demo总报异常。

 public void streamChat(ChatBody chatBody, EventSourceListener eventSourceListener, ModelE modelE) {
        if (Objects.isNull(eventSourceListener)) {
            throw new WenXinException("参数异常:EventSourceListener不能为空");
        }
        chatBody.setStream(true);
        try {
            EventSource.Factory factory = EventSources.createFactory(this.okHttpClient);
            Request request = new Request.Builder().url(assembleUrl(modelE))
                    .post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()),
                            new ObjectMapper().writeValueAsString(chatBody))).build();
            factory.newEventSource(request, eventSourceListener);
        } catch (Exception e) {
            log.error("请求参数解析异常:", e);
            e.printStackTrace();
        }
    }
private String assembleUrl(ModelE modelE) {
        accessToken = WenXinConfig.refreshAccessToken();
        return modelE.getApiHost() + "?access_token=" + accessToken;
    }

5、定义Sse的接口是实现方法

public interface SseService {
    /**
     * 创建SSE
     * @param uid
     * @return
     */
    SseEmitter createSse(String uid);
    /**
     * 关闭SSE
     * @param uid
     */
    void closeSse(String uid);
    /**
     * 客户端发送消息到服务端
     * @param uid
     * @param chatRequest
     */
    ChatResponse sseChat(String uid, ChatRequest chatRequest);
}
public class WenXinSseServiceImpl implements SseService {
    @Value("${chat.accessKeyId}")
    private String accessKeyId;
    @Value("${chat.accessKeySecret}")
    private String accessKeySecret;
    @Value("${chat.agentKey}")
    private String agentKey;
    @Value("${chat.appId}")
    private String appId;
    @Autowired
    WenXinClient wenXinClient;
    @Override
    public SseEmitter createSse(String uid) {
        //默认30秒超时,设置为0L则永不超时
        SseEmitter sseEmitter = new SseEmitter(0l);
        //完成后回调
        sseEmitter.onCompletion(() -> {
            log.info("[{}]结束连接...................", uid);
            LocalCache.CACHE.remove(uid);
        });
        //超时回调
        sseEmitter.onTimeout(() -> {
            log.info("[{}]连接超时...................", uid);
        });
        //异常回调
        sseEmitter.onError(
                throwable -> {
                    try {
                        log.info("[{}]连接异常,{}", uid, throwable.toString());
                        sseEmitter.send(SseEmitter.event()
                                .id(uid)
                                .name("发生异常!")
                                .data(Message.builder().content("发生异常请重试!").build())
                                .reconnectTime(3000));
                        LocalCache.CACHE.put(uid, sseEmitter);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        );
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(5000));
        } catch (IOException e) {
            e.printStackTrace();
        }
        LocalCache.CACHE.put(uid, sseEmitter);
        log.info("[{}]创建sse连接成功!", uid);
        return sseEmitter;
    }
    @Override
    public void closeSse(String uid) {
        SseEmitter sse = (SseEmitter) LocalCache.CACHE.get(uid);
        if (sse != null) {
            sse.complete();
            //移除
            LocalCache.CACHE.remove(uid);
        }
    }
    @Override
    public ChatResponse sseChat(String uid, ChatRequest chatRequest) {
        if (StringUtils.isBlank(chatRequest.getMsg())) {
            log.error("参数异常,msg为null", uid);
            throw new BaseException("参数异常,msg不能为空~");
        }
        SseEmitter sseEmitter = (SseEmitter) LocalCache.CACHE.get(uid);
        if (sseEmitter == null) {
            log.info("聊天消息推送失败uid:[{}],没有创建连接,请重试。", uid);
            throw new BaseException("聊天消息推送失败uid:[{}],没有创建连接,请重试。~");
        }
        WenxinEventSourceListener openAIEventSourceListener = new WenxinEventSourceListener(sseEmitter);
        List messages = new ArrayList<>();
        messages.add(MessageItem.builder().role(MessageItem.Role.USER).content(chatRequest.getMsg()).build());
        wenXinClient.streamChat(messages, openAIEventSourceListener, ModelE.ERNIE_Bot);
        LocalCache.CACHE.put("msg" + uid, JSONUtil.toJsonStr(messages), LocalCache.TIMEOUT);
        ChatResponse response = new ChatResponse();
        response.setQuestionTokens(1);
        return response;
    }
}

6、主要的controller接口

/**
     * 创建sse连接
     *
     * @param headers
     * @return
     */
    @CrossOrigin
    @GetMapping("/createSse")
    public SseEmitter createConnect(@RequestHeader Map headers) {
        String uid = getUid(headers);
        return sseService.createSse(uid);
    }
    /**
     * 聊天接口
     *
     * @param chatRequest
     * @param headers
     */
    @CrossOrigin
    @PostMapping("/chat")
    @ResponseBody
    public ChatResponse sseChat(@RequestBody ChatRequest chatRequest, @RequestHeader Map headers, HttpServletResponse response) {
        String uid = getUid(headers);
        return sseService.sseChat(uid, chatRequest);
    }
    /**
     * 关闭连接
     *
     * @param headers
     */
    @CrossOrigin
    @GetMapping("/closeSse")
    public void closeConnect(@RequestHeader Map headers) {
        String uid = getUid(headers);
        sseService.closeSse(uid);
    }

7、主要的页面代码




  
  
  智能问答
   
  
  
  
  
  
  

  
    

智能问答

最后的呈现效果如下:

转载请注明来自码农世界,本文标题:《百度文心一言 java 支持流式输出,Springboot+ sse的demo》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,59人围观)参与讨论

还没有评论,来说两句吧...

Top