问题重现
在调用 ChatGPT API 并使用流式输出时,我们经常会遇到网络问题导致的超时情况。有趣的是,笔者发现在本地调试遇到的超时,会在 10 分钟后自动恢复(为什么是 10 分钟?我们留到后面解释),但是在服务器上等待一会儿却会失败,报出超时异常(错误代码 502)。
笔者认为,本地能恢复的原因可能是自动重试,只是重试的时间有点久(ChatGPT API 没有重试功能,这是项目加入的)。服务器返回「502」是因为内容从后台返回到前端需要经过网关层,而网关层超时校验的时间比自动重试的时间(10 分钟)更短,所以撑不到重试就会报超时异常。
基于以上场景,本文着手解决 ChatGPT API 调用超时问题。
优化诉求
- 不向用户展示超时的报错信息。
- 缩短超时后重试的时间间隔。
解决思路
笔者考虑了两种方案。
一是彻底解决网络问题,但难度有点大。 这属于 OpenAI 服务器问题,即使是部署在国外的服务器也会出现超时的情况。
二是利用自动重试解决问题。 通过调整超时的时间,提升响应速度,方案可行。
实施解决方案
解决过程中,笔者分两步由浅至深地调整了超时时间;如果想直接了解最终方案,请移步「解决方案二」~
- 运行环境:
Python: 3.10.7
openai: 0.27.6
- 调用方法:
openai.api_resources.chat_completion.ChatCompletion.acreate
( 这是异步调用 ChatGPT 的方法。)
- 方法调用链路:
超时参数 ClientTimeout
,一共有 4 个属性 total
、connect
、sock_read
和 sock_connect
。
# 方法 -> 超时相关参数
openai.api_resources.chat_completion.ChatCompletion.acreate -> kwargs
openai.api_resources.abstract.engine_api_resource.EngineAPIResource.acreate -> params
openai.api_requestor.APIRequestor.arequest -> request_timeout
# request_timeout 在这一步变成了 timeout,因此,只需要传参 request_timeout 即可
openai.api_requestor.APIRequestor.arequest_raw -> request_timeout
aiohttp.client.ClientSession.request -> kwargs
aiohttp.client.ClientSession._request -> timeout
tm = TimeoutHandle(self._loop, real_timeout.total) -> ClientTimeout.total
async with ceil_timeout(real_timeout.connect): -> ClientTimeout.connect
# 子分支1
aiohttp.connector.BaseConnector.connect -> timeout
aiohttp.connector.TCPConnector._create_connection -> timeout
aiohttp.connector.TCPConnector._create_direct_connection -> timeout
aiohttp.connector.TCPConnector._wrap_create_connection -> timeout
async with ceil_timeout(timeout.sock_connect): -> ClientTimeout.sock_connect
# 子分支2
aiohttp.client_reqrep.ClientRequest.send -> timeout
aiohttp.client_proto.ResponseHandler.set_response_params -> read_timeout
aiohttp.client_proto.ResponseHandler._reschedule_timeout -> self._read_timeout
if timeout:
self._read_timeout_handle = self._loop.call_later(
timeout, self._on_read_timeout
) -> ClientTimeout.sock_read
解决方案一
openai.api_requestor.APIRequestor.arequest_raw
方法中的 request_timeout
参数可以传递 connect
和 total
参数.
因此可以在调用 openai.api_resources.chat_completion.ChatCompletion.acreate
时,设置 request_time(10, 300)
。
#
async def arequest_raw(
self,
method,
url,
session,
*,
params=None,
supplied_headers: Optional[Dict[str, str]] = None,
files=None,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
) -> aiohttp.ClientResponse:
abs_url, headers, data = self._prepare_request_raw(
url, supplied_headers, method, params, files, request_id
)
if isinstance(request_timeout, tuple):
timeout = aiohttp.ClientTimeout(
connect=request_timeout[0],
total=request_timeout[1],
)else:
timeout = aiohttp.ClientTimeout(
total=request_timeout if request_timeout else TIMEOUT_SECS
)
...
该方案有效,但没有完全生效:它可以控制连接时间和请求的全部时间,但没有彻底解决超时异常,因为「请求连接时间」和「第一个字符读取时间」是两码事。「请求连接时间」基于 total
时间重试(300s),而网关时间并没有设置这么久。
于是,笔者继续提出「解决方案二」。
解决方案二
使用 monkey_patch
方式重写 openai.api_requestor.APIRequestor.arequest_raw
方法,重点在于重写 request_timeout 参数,让其支持原生的 aiohttp.client.ClientTimeout
参数。
1. 新建 api_requestor_mp.py
文件,并写入以下代码。
# 注意 request_timeout 参数已经换了,Optional[Union[float, Tuple[float, float]]] -> Optional[Union[float, tuple]]
async def arequest_raw(
self,
method,
url,
session,
*,
params=None,
supplied_headers: Optional[Dict[str, str]] = None,
files=None,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, tuple]] = None,
) -> aiohttp.ClientResponse:
abs_url, headers, data = self._prepare_request_raw(
url, supplied_headers, method, params, files, request_id
)
# 判断 request_timeout 的类型,按需设置 sock_read 和 sock_connect 属性
if isinstance(request_timeout, tuple):
timeout = aiohttp.ClientTimeout(
connect=request_timeout[0],
total=request_timeout[1],
sock_read=None if len(request_timeout) < 3 else request_timeout[2],
sock_connect=None if len(request_timeout) < 4 else request_timeout[3],
)
else:
timeout = aiohttp.ClientTimeout(
total=request_timeout if request_timeout else TIMEOUT_SECS
)
if files:
# TODO: Use aiohttp.MultipartWriter to create the multipart form data here.
# For now we use the private requests method that is known to have worked so far.
data, content_type = requests.models.RequestEncodingMixin._encode_files( # type: ignore
files, data
)
headers["Content-Type"] = content_type
request_kwargs = {
"method": method,
"url": abs_url,
"headers": headers,
"data": data,
"proxy": _aiohttp_proxies_arg(openai.proxy),
"timeout": timeout,
}
try:
result = await session.request(**request_kwargs)
util.log_info(
"OpenAI API response",
path=abs_url,
response_code=result.status,
processing_ms=result.headers.get("OpenAI-Processing-Ms"),
request_id=result.headers.get("X-Request-Id"),
)
# Don't read the whole stream for debug logging unless necessary.
if openai.log == "debug":
util.log_debug(
"API response body", body=result.content, headers=result.headers
)
return result
except (aiohttp.ServerTimeoutError, asyncio.TimeoutError) as e:
raise error.Timeout("Request timed out") from e
except aiohttp.ClientError as e:
raise error.APIConnectionError("Error communicating with OpenAI") from e
def monkey_patch():
APIRequestor.arequest_raw = arequest_raw
2. 在初始化 ChatGPT API 的文件头部补充:
from *.*.api_requestor_mp import monkey_patch
do_api_requestor = monkey_patch
设置参数 request_timeout=(10, 300, 15, 10)
后,再调试就没什么问题了。
交付测试,通过。
经验总结
- 直接看代码、看方法调用链路会有点困难,可以通过异常堆栈来找调用链路,这样更方便。
- ChatGPT API 暴露的
request_timeout
参数不够用,需要重写;搜索了一下重写方案,了解到monkey_patch
,非常实用。 - 项目过程中,笔者发现改代码本身不难,难的是知道「改哪里」「怎么改」以及「为什么」。
>> LigaAI 往期精彩阅读 <<
在 IDE 插件开发中接入 JCEF 框架 – LigaAI 团队博客 (ligai.cn)