Skip to content

异步回调

python
import asyncio
from typing import Any, Dict, List

from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI
import time

openai_api_key = "EMPTY"
openai_api_base = "http://localhost:11434/v1"
class MyCustomSyncHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")


class MyCustomAsyncHandler(AsyncCallbackHandler):
    """Async callback handler that can be used to handle callbacks from langchain."""

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when chain starts running."""
        print("等待0.3秒....",time.time())
        await asyncio.sleep(0.5)
        class_name = serialized["name"]
        print("LLM正在启动",time.time())

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when chain ends running."""
        print("等待0.3秒....",time.time())
        await asyncio.sleep(0.5)
        print("LLM结束",time.time())


# To enable streaming, we pass in `streaming=True` to the ChatModel constructor
# Additionally, we pass in a list with our custom handler
 
chat = ChatOpenAI(
    openai_api_key = openai_api_key, 
    openai_api_base = openai_api_base,
    streaming=True,
    temperature = 0,
    model="qwen2:1.5b",
    callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],)


await chat.agenerate([[HumanMessage(content="江苏的省会是哪")]])

输出

shell
模拟等待... 1743471063.432649
LLM正在启动 1743471063.936486
Sync handler being called in a `thread_pool_executor`: token: 江苏省
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token: 南京
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token: 
模拟等待... 1743471064.2091699
LLM结束 1743471064.710964
LLMResult(generations=[[ChatGeneration(text='江苏省的省会是南京。', generation_info={'finish_reason': 'stop', 'model_name': 'qwen2:1.5b', 'system_fingerprint': 'fp_ollama'}, message=AIMessage(content='江苏省的省会是南京。', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'qwen2:1.5b', 'system_fingerprint': 'fp_ollama'}, id='run-67a07798-4092-4cfd-956e-493f6a06b566-0'))]], llm_output={'token_usage': {}, 'model_name': 'qwen2:1.5b'}, run=[RunInfo(run_id=UUID('67a07798-4092-4cfd-956e-493f6a06b566'))], type='LLMResult')

时间点分析

模拟等待... 1743471063.432649
LLM正在启动 1743471063.936486
模拟等待... 1743471064.2091699
LLM结束    1743471064.710964