FastAPI 实现OpenAI接口流式输出
SSE方案
注意事项:EventSourceResponse
方法不能被中间件压缩, 例如:GZip。
import openai
from fastapi import APIRouter, Request
from sse_starlette.sse import EventSourceResponse
# OpenAI 流式输出
@router.get('/openai/stream_output', summary='OpenAI Stream Output')
async def openai_stream_output(request: Request):
async def generator():
client = openai.AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
)
completion = await client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[
{
"role": "system",
"content": "你是一名擅长写小学日记的作者"
},
{
"role": "user",
"content": "请帮我写100字的日记给我"
}
],
temperature=1,
max_tokens=256,
presence_penalty=0.4,
stream=True,
)
async for event in completion:
disconnected = await request.is_disconnected()
if disconnected:
break
log.debug(f"Received event: {event.choices[0].delta.content}")
yield event.choices[0].delta.content
return EventSourceResponse(generator(), send_timeout=30) # SSE streaming
WebSocket方案
from typing import NoReturn
import openai
from fastapi import APIRouter, Request
from starlette.websockets import WebSocket
# OpenAI 流式输出(Websocket)
@router.websocket('/openai/stream_output_ws')
async def openai_stream_output_ws(websocket: WebSocket) -> NoReturn:
"""
Websocket for AI responses
"""
await websocket.accept()
client = openai.AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
)
completion = await client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[
{
"role": "system",
"content": "你是一名擅长写小学日记的作者"
},
{
"role": "user",
"content": "请帮我写100字的日记给我"
}
],
temperature=1,
max_tokens=256,
presence_penalty=0.4,
stream=True,
)
async for event in completion:
log.debug(f"Received event: {event.choices[0].delta.content}")
await websocket.send_json({
"type": "text",
"data": event.choices[0].delta.content
})
await websocket.close()
阅读剩余
版权声明:
作者:Nuanxinqing
链接:https://6b7.org/567.html
文章版权归作者所有,未经允许请勿转载。
THE END