Appearance
以下为LangChain4j流式响应接入Android源码
@GetMapping(value = "/stream", produces = "text/html;charset=UTF-8")
public Flux<String> stream(@RequestParam(value = "question",
defaultValue = "敏哥什么时候创建了探索智能应用") String question) {
return streamAssistant.answer(question);
}typeWriterClient = new TypewriterClient("http://192.168.1.17:8080/rag");
startChat(ed.getText().toString());
private void startChat(String question) {
fullResponse.setLength(0);
textView.setText("思考中...");
typeWriterClient.startStreaming(question, new TypewriterClient.StreamCallback() {
@Override
public void onConnected() {
runOnUiThread(() -> {
textView.setText("思考中...");
});
}
@Override
public void onTokenReceived(String text) {
runOnUiThread(() -> {
fullResponse.append(text);
textView.setText(fullResponse.toString());
});
}
@Override
public void onComplete() {
runOnUiThread(() -> {
// 可以在末尾加个光标消失动画等
});
}
@Override
public void onError(Throwable t) {
runOnUiThread(() -> {
fullResponse.append("\n[发生错误: ").append(t.getMessage()).append("]");
textView.setText(fullResponse.toString());
});
t.printStackTrace();
}
});
}public class TypewriterClient {
private final OkHttpClient client;
private final String baseUrl;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
public TypewriterClient(String baseUrl) {
this.baseUrl = baseUrl;
this.client = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS) // 流式请求:无限读取超时
.connectTimeout(10, TimeUnit.SECONDS)
.build();
}
public void startStreaming(String question, StreamCallback callback) {
isCancelled.set(false);
HttpUrl httpUrl = HttpUrl.parse(baseUrl + "/stream")
.newBuilder()
.addQueryParameter("question", question)
.build();
Request request = new Request.Builder()
.url(httpUrl)
.get()
.addHeader("Accept", "text/plain;charset=UTF-8, text/html;charset=UTF-8")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
if (!isCancelled.get()) callback.onError(e);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
callback.onError(new IOException("Server error: " + response.code()));
return;
}
ResponseBody body = response.body();
if (body == null) {
callback.onError(new IOException("Empty body"));
return;
}
callback.onConnected();
BufferedSource source = body.source();
Charset utf8 = Charset.forName("UTF-8");
try {
String line;
while ((line = source.readUtf8Line()) != null && !isCancelled.get()) {
if (!line.isEmpty()) {
callback.onTokenReceived(line);
}
}
callback.onComplete();
} catch (IOException e) {
if (!isCancelled.get()) callback.onError(e);
} finally {
body.close();
}
}
});
}
public void cancel() {
isCancelled.set(true);
client.dispatcher().cancelAll();
}
public interface StreamCallback {
void onConnected();
void onTokenReceived(String text); // 每次收到一个字/词
void onComplete();
void onError(Throwable t);
}
}