如何使用从提示流部署的流式处理终结点

在提示流中,可以将流部署到 Azure 机器学习托管联机终结点进行实时推理。

通过发送请求来使用终结点时,默认行为是联机终结点会一直等到整个响应准备就绪才将其发送回客户端。 这可能会导致客户端延迟时间很长,用户体验很糟糕。

为了避免这种情况,可以在使用终结点时使用流式处理。 启用流式处理后,无需等待整个响应准备就绪, 而是可以让服务器将响应以区块的形式边生成边发回。 然后,客户端可以逐步显示响应,从而减少等待时间,提高交互性。

本文将介绍流式处理的范围、流式处理的工作原理以及流式处理终结点的使用方式。

创建启用了流式处理的流

如果要使用流式处理模式,则需要创建一个流,该流有一个节点,该节点生成一个字符串生成器作为流的输出。 字符串生成器是一种可以根据请求一次返回一个字符串的对象。 可以使用以下类型的节点来创建字符串生成器:

  • LLM 节点:此节点使用大型语言模型根据输入生成自然语言响应。

    {# Sample prompt template for LLM node #}
    
    system:
    You are a helpful assistant.
    
    user:
    {{question}}
    
  • Python 节点:此节点允许编写可以生成字符串输出的自定义 Python 代码。 可以使用此节点调用支持流式处理的外部 API 或库。 例如,可以使用以下代码逐字回显输入:

    from promptflow import tool
    
    # Sample code echo input by yield in Python tool node
    
    @tool
    def my_python_tool(paragraph: str) -> str:
        yield "Echo: "
        for word in paragraph.split():
            yield word + " "
    

重要

只有流的最后一个节点的输出才能支持流式处理。

“最后一个节点”意味着该节点的输出不被其他节点使用。

在本指南中,我们将使用“与维基百科聊天”示例流作为示例。 此流会处理用户的问题,在维基百科中搜索相关文章,并使用文章中的信息回答问题。 它使用流式处理模式来显示应答生成的进度。

若要了解如何创建聊天流,请参阅如何在提示流中开发聊天流来创建聊天流。

“与维基百科聊天”流的屏幕截图。

将流部署为联机终结点

若要使用流式处理模式,需要将流部署为联机终结点。 这样就可以实时发送请求并接收来自流的响应。

若要了解如何将流部署为联机终结点,请参阅使用 CLI 将流部署到联机终结点进行实时推理,以将流部署为联机终结点。

注意

使用高于版本 20230816.v10 的运行时环境版本进行部署。

可以在运行时详细信息页面中检查运行时版本并更新运行时。

Azure 机器学习工作室的屏幕截图,显示运行时环境。

了解流式处理过程

当你有联机终结点时,客户端和服务器需要遵循内容协商的特定原则才能利用流式处理模式:

内容协商类似于客户端和服务器之间关于要发送和接收的数据的首选格式的对话。 它确保进行有效沟通并就交换数据的格式达成一致。

若要了解流式处理过程,请考虑以下步骤:

  • 首先,客户端使用 Accept 标头中包含的所需媒体类型构造 HTTP 请求。 媒体类型告知服务器客户端期望哪种数据格式。 就像客户端说:“嘿,我正在为你将发送给我的数据寻找一种特定的格式。 它可能是 JSON、文本,也可能是其他格式。”例如,application/json 表示偏好 JSON 数据,text/event-stream 表示需要流式处理数据,而 */* 则表示客户端接受任何数据格式。

    注意

    如果请求缺少 Accept 标头或具有空的 Accept 标头,则意味着客户端会接受任何媒体类型作为响应。 服务器将其视为 */*

  • 接下来,服务器根据 Accept 标头中指定的媒体类型进行响应。 请务必注意,客户端可能会在 Accept 标头中请求多种媒体类型,服务器必须考虑其功能和格式优先级以确定适当的响应。

    • 首先,服务器检查 Accept 标头中是否显式指定了 text/event-stream
      • 对于启用了流式处理的流,服务器会返回一个 Content-Typetext/event-stream 的响应,表明数据在进行流式处理。
      • 对于未启用流式处理的流,服务器会继续查找标头中指定的其他媒体类型。
    • 如果未指定 text/event-stream,服务器将检查 Accept 标头中是否指定了 application/json*/*
      • 在此类情况下,服务器会返回一个 Content-Typeapplication/json 的响应,并提供 JSON 格式的数据。
    • 如果 Accept 标头指定其他媒体类型,例如 text/html
      • 服务器会返回 424 响应,其中包含提示流运行时错误代码 UserError 和运行时 HTTP 状态 406,表明服务器无法使用客户端请求的数据格式来满足请求。 若要了解详细信息,请参阅处理错误
  • 最后,客户端检查 Content-Type 响应头。 如果它被设置为 text/event-stream,则表示数据正在进行流式处理。

让我们更详细地了解流式处理过程。 流式处理模式下的响应数据遵循服务器发送的事件 (SSE) 的格式。

总体过程如下所示:

0. 客户端向服务器发送消息

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Hello",
    "chat_history": []
}

注意

Accept 标头设置为 text/event-stream 以请求流响应。

1. 服务器以流式处理模式发回响应

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Hello"}

data: {"answer": "!"}

data: {"answer": " How"}

data: {"answer": " can"}

data: {"answer": " I"}

data: {"answer": " assist"}

data: {"answer": " you"}

data: {"answer": " today"}

data: {"answer": " ?"}

data: {"answer": ""}

注意

Content-Type 设置为 text/event-stream; charset=utf-8,表示响应是事件流。

客户端应将响应数据解码为服务器发送的事件并以增量方式显示它们。 发送完所有数据后,服务器会关闭 HTTP 连接。

每个响应事件都是上一个事件的增量。 建议让客户端跟踪内存中的合并数据,并在下一个请求中将其作为聊天历史记录发送回服务器。

2. 客户端向服务器发送另一条聊天消息以及完整的聊天历史记录

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Glad to know you!",
    "chat_history": [
        {
            "inputs": {
                "question": "Hello"
            },
            "outputs": {
                "answer": "Hello! How can I assist you today?"
            }
        }
    ]
}

3. 服务器以流式处理模式发回应答

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Nice"}

data: {"answer": " to"}

data: {"answer": " know"}

data: {"answer": " you"}

data: {"answer": " too"}

data: {"answer": "!"}

data: {"answer": " Is"}

data: {"answer": " there"}

data: {"answer": " anything"}

data: {"answer": " I"}

data: {"answer": " can"}

data: {"answer": " help"}

data: {"answer": " you"}

data: {"answer": " with"}

data: {"answer": "?"}

data: {"answer": ""}

然后聊天以类似的方式继续。

处理错误

客户端应先检查 HTTP 响应代码。 有关联机终结点返回的常见错误代码,请参阅 HTTP 状态代码表

如果响应代码为“424 模型错误”,则表示错误是由模型的代码引起的。 来自提示流模型的错误响应始终遵循以下格式:

{
  "error": {
    "code": "UserError",
    "message": "Media type text/event-stream in Accept header is not acceptable. Supported media type(s) - application/json",
  }
}
  • 它始终是一个 JSON 字典,只定义了一个键:“error”。
  • “error”的值是一个字典,包含“code”、“message”。
  • “code”定义错误类别。 目前,对于错误的用户输入,它可能是“UserError”;对于服务内部的错误,它可能是“SystemError”。
  • “message”是对错误的说明。 它可以显示给最终用户。

如何使用服务器发送的事件

通过 Python 进行使用

在此示例用法中,我们将使用该 SSEClient 类。 此类不是内置的 Python 类,需要单独进行安装。 可以通过 pip 安装它:

pip install sseclient-py

示例用法如下:

import requests
from sseclient import SSEClient
from requests.exceptions import HTTPError

try:
    response = requests.post(url, json=body, headers=headers, stream=stream)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type')
    if "text/event-stream" in content_type:
        client = SSEClient(response)
        for event in client.events():
            # Handle event, i.e. print to stdout
    else:
        # Handle json response

except HTTPError:
    # Handle exceptions

通过 JavaScript 进行使用

可以通过多个库在 JavaScript 中使用服务器发送的事件。 下面是其中一个示例

使用 Python 的示例聊天应用

下面是使用 Python 编写的聊天应用示例

以 gif 动画方式演示一个使用 Python 的示例聊天应用。

高级用法 - 混合流和非流的流输出

有时,你可能希望从流输出中获取流结果和非流结果。 例如,在“与维基百科聊天”流中,你可能不仅希望获得 LLM 的应答,还希望获得流搜索的 URL 的列表。 为此,需要修改流以输出流 LLM 的应答和非流 URL 列表的组合。

在“与维基百科聊天”示例流中,输出连接到 LLM 节点 augmented_chat。 若要将 URL 列表添加到输出,需要添加名称为 url 且值为 ${get_wiki_url.output} 的输出字段。

“与维基百科进行混合聊天”流的屏幕截图。

流的输出将是一个作为基础的非流字段和一个作为增量的流字段。 下面是请求和响应的示例。

高级用法 - 0。 客户端向服务器发送消息

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When was ChatGPT launched?",
    "chat_history": []
}

高级使用 - 1。 服务器以流式处理模式发回应答

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=ChatGPT", "https://en.wikipedia.org/w/index.php?search=GPT-4"]}

data: {"answer": ""}

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " \n\n"}

...

data: {"answer": "PT"}

data: {"answer": ""}

高级使用 - 2。 客户端向服务器发送另一条聊天消息以及完整的聊天历史记录

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When did OpenAI announce GPT-4? How long is it between these two milestones?",
    "chat_history": [
        {
            "inputs": {
                "question": "When was ChatGPT launched?"
            },
            "outputs": {
                "url": [
                    "https://en.wikipedia.org/w/index.php?search=ChatGPT",
                    "https://en.wikipedia.org/w/index.php?search=GPT-4"
                ],
                "answer": "ChatGPT was launched on November 30, 2022. \n\nSOURCES: https://en.wikipedia.org/w/index.php?search=ChatGPT"
            }
        }
    ]
}

高级使用 - 3。 服务器以流式处理模式发回应答

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=Generative pre-trained transformer ", "https://en.wikipedia.org/w/index.php?search=Microsoft "]}

data: {"answer": ""}

data: {"answer": "Open"}

data: {"answer": "AI"}

data: {"answer": " released"}

data: {"answer": " G"}

data: {"answer": "PT"}

data: {"answer": "-"}

data: {"answer": "4"}

data: {"answer": " in"}

data: {"answer": " March"}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "3"}

data: {"answer": "."}

data: {"answer": " Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " The"}

data: {"answer": " time"}

data: {"answer": " between"}

data: {"answer": " these"}

data: {"answer": " two"}

data: {"answer": " milestones"}

data: {"answer": " is"}

data: {"answer": " approximately"}

data: {"answer": " "}

data: {"answer": "3"}

data: {"answer": " months"}

data: {"answer": ".\n\n"}

...

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": ""}

后续步骤