主题
异步回调
python
import asyncio
from typing import Any, Dict, List
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI
import time
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:11434/v1"
class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")
class MyCustomAsyncHandler(AsyncCallbackHandler):
"""Async callback handler that can be used to handle callbacks from langchain."""
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when chain starts running."""
print("等待0.3秒....",time.time())
await asyncio.sleep(0.5)
class_name = serialized["name"]
print("LLM正在启动",time.time())
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when chain ends running."""
print("等待0.3秒....",time.time())
await asyncio.sleep(0.5)
print("LLM结束",time.time())
# To enable streaming, we pass in `streaming=True` to the ChatModel constructor
# Additionally, we pass in a list with our custom handler
chat = ChatOpenAI(
openai_api_key = openai_api_key,
openai_api_base = openai_api_base,
streaming=True,
temperature = 0,
model="qwen2:1.5b",
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],)
await chat.agenerate([[HumanMessage(content="江苏的省会是哪")]])
输出
shell
模拟等待... 1743471063.432649
LLM正在启动 1743471063.936486
Sync handler being called in a `thread_pool_executor`: token: 江苏省
Sync handler being called in a `thread_pool_executor`: token: 的
Sync handler being called in a `thread_pool_executor`: token: 省
Sync handler being called in a `thread_pool_executor`: token: 会
Sync handler being called in a `thread_pool_executor`: token: 是
Sync handler being called in a `thread_pool_executor`: token: 南京
Sync handler being called in a `thread_pool_executor`: token: 。
Sync handler being called in a `thread_pool_executor`: token:
模拟等待... 1743471064.2091699
LLM结束 1743471064.710964
LLMResult(generations=[[ChatGeneration(text='江苏省的省会是南京。', generation_info={'finish_reason': 'stop', 'model_name': 'qwen2:1.5b', 'system_fingerprint': 'fp_ollama'}, message=AIMessage(content='江苏省的省会是南京。', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'qwen2:1.5b', 'system_fingerprint': 'fp_ollama'}, id='run-67a07798-4092-4cfd-956e-493f6a06b566-0'))]], llm_output={'token_usage': {}, 'model_name': 'qwen2:1.5b'}, run=[RunInfo(run_id=UUID('67a07798-4092-4cfd-956e-493f6a06b566'))], type='LLMResult')
时间点分析
模拟等待... 1743471063.432649
LLM正在启动 1743471063.936486
模拟等待... 1743471064.2091699
LLM结束 1743471064.710964