Skip to content

以下为LangChain4j流式响应接入Android源码

服务端核心代码

java
@GetMapping(value = "/stream", produces = "text/html;charset=UTF-8")
public Flux<String> stream(@RequestParam(value = "question",
        defaultValue = "敏哥什么时候创建了探索智能应用") String question) {
    return streamAssistant.answer(question);
}

Android端核心代码

java
typeWriterClient = new TypewriterClient("http://192.168.1.17:8080/rag");
startChat(ed.getText().toString());

private void startChat(String question) {
      fullResponse.setLength(0);
      textView.setText("思考中...");

      typeWriterClient.startStreaming(question, new TypewriterClient.StreamCallback() {
          @Override
          public void onConnected() {
              runOnUiThread(() -> {
                  textView.setText("思考中...");
              });
          }

          @Override
          public void onTokenReceived(String text) {
              runOnUiThread(() -> {
                  fullResponse.append(text);
                  textView.setText(fullResponse.toString());
              });
          }

          @Override
          public void onComplete() {
              runOnUiThread(() -> {
                  // 可以在末尾加个光标消失动画等
              });
          }

          @Override
          public void onError(Throwable t) {
              runOnUiThread(() -> {
                  fullResponse.append("\n[发生错误: ").append(t.getMessage()).append("]");
                  textView.setText(fullResponse.toString());
              });
              t.printStackTrace();
          }
      });
  }
java
public class TypewriterClient {
    private final OkHttpClient client;
    private final String baseUrl;
    private final AtomicBoolean isCancelled = new AtomicBoolean(false);

    public TypewriterClient(String baseUrl) {
        this.baseUrl = baseUrl;
        this.client = new OkHttpClient.Builder()
                .readTimeout(0, TimeUnit.MILLISECONDS) // 流式请求:无限读取超时
                .connectTimeout(10, TimeUnit.SECONDS)
                .build();
    }

    public void startStreaming(String question, StreamCallback callback) {
        isCancelled.set(false);

        HttpUrl httpUrl = HttpUrl.parse(baseUrl + "/stream")
                .newBuilder()
                .addQueryParameter("question", question)
                .build();

        Request request = new Request.Builder()
                .url(httpUrl)
                .get()
                .addHeader("Accept", "text/plain;charset=UTF-8, text/html;charset=UTF-8")
                .build();

        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                if (!isCancelled.get()) callback.onError(e);
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                if (!response.isSuccessful()) {
                    callback.onError(new IOException("Server error: " + response.code()));
                    return;
                }

                ResponseBody body = response.body();
                if (body == null) {
                    callback.onError(new IOException("Empty body"));
                    return;
                }

                callback.onConnected();
                BufferedSource source = body.source();
                Charset utf8 = Charset.forName("UTF-8");

                try {
                    String line;
                    while ((line = source.readUtf8Line()) != null && !isCancelled.get()) {
                        if (!line.isEmpty()) {
                            callback.onTokenReceived(line);
                        }
                    }
                    callback.onComplete();

                } catch (IOException e) {
                    if (!isCancelled.get()) callback.onError(e);
                } finally {
                    body.close();
                }
            }
        });
    }

    public void cancel() {
        isCancelled.set(true);
        client.dispatcher().cancelAll();
    }

    public interface StreamCallback {
        void onConnected();
        void onTokenReceived(String text); // 每次收到一个字/词
        void onComplete();
        void onError(Throwable t);
    }
}