从存储中的事件网格事件运行批处理终结点

适用范围:Azure CLI ml 扩展 v2(最新版)Python SDK azure-ai-ml v2(最新版)

事件网格是一项全面托管的服务,可用于在许多不同的 Azure 服务和应用程序中轻松管理事件。 该服务简化了生成事件驱动和无服务器应用程序的方式。 本教程介绍如何在存储帐户中创建文件后立即触发批处理终结点的作业来处理这些文件。 此体系结构中使用逻辑应用工作流来订阅这些事件并触发终结点。

下图显示了此解决方案的体系结构:

概念图显示了此体系结构的组成部分。

以下步骤描述了此解决方案中的大致步骤:

  1. 在特定存储帐户中创建新的 Blob 时,将触发“已创建文件”事件

  2. 该事件会发送到事件网格,经过处理后发送给所有订阅者。

  3. 逻辑应用工作流订阅并侦听这些事件。

    由于存储帐户可以包含多个数据资产,因此将应用事件筛选以仅对存储帐户内特定文件夹中发生的事件做出反应。 如果需要,可进一步筛选(例如,根据文件扩展名)。

  4. 逻辑应用工作流触发并执行以下操作:

    1. 获得一个授权令牌,使用来自服务主体的凭据调用批处理终结点。

    2. 使用新创建的文件作为输入触发批处理终结点(默认部署)。

  5. 批处理终结点将返回为处理文件而创建的作业的名称。

重要

使用与事件网格连接的逻辑应用工作流调用批处理终结点时,将为在存储帐户中创建的每个 Blob 文件创建一个作业。 请记住,由于批处理终结点在文件级别分发工作,因此不会发生任何并行化。 相反,可以使用批处理终结点的功能在相同的计算群集上执行多个作业。 如果需要以自动方式在整个文件夹上运行作业,我们建议切换到从 Azure 数据工厂调用批处理终结点

先决条件

  • 你已将模型正确部署为批处理终结点。 如果需要,可以扩展此体系结构来与管道组件部署配合使用。

  • 你的批处理部署在名为 batch-cluster 的计算群集中运行。

  • 你创建的逻辑应用将使用 REST 与 Azure 机器学习批处理终结点进行通信。

    有关如何使用批处理终结点的 REST API 的详细信息,请阅读为批处理终结点创建作业和输入数据

对批处理终结点进行身份验证

Azure 逻辑应用可以使用 HTTP 操作调用批处理终结点的 REST API。 Batch 终结点支持 Microsoft Entra ID 进行授权,因此对 API 发出的请求需要适当的身份验证处理。

在这种情况下,本教程使用服务主体进行身份验证以及与批处理终结点交互。

  1. 按照向 Microsoft Entra ID 注册应用程序并创建服务主体中的步骤创建服务主体。

  2. 按照选项 3:创建新的客户端密码中的说明创建用于身份验证的机密。

  3. 务必保存生成的客户端密码值,该值仅显示一次

  4. 务必保存应用程序“概述”窗格中的 client IDtenant id

  5. 按照授予访问权限中的说明授予服务主体访问工作区的权限。 在本例中,服务主体需要以下权限:

    • 工作区中读取批处理部署并对其执行操作的权限。
    • 在数据存储中读取/写入的权限。

启用数据访问

为了指示要发送到部署作业的输入数据,本教程使用事件网格提供的云 URI。 批处理终结点使用计算的标识来装载数据,同时保留作业的标识,以便读取装载的作业。 因此,必须将用户分配的托管标识分配给计算群集,以确保群集有权装载基础数据。 若要确保数据访问,请执行以下步骤:

  1. 创建托管标识资源

    IDENTITY=$(az identity create  -n azureml-cpu-cluster-idn  --query id -o tsv)
    
  2. 更新计算群集以使用我们创建的托管标识:

    注意

    此示例假设你创建了一个名为 cpu-cluster 的计算群集,将其用于终结点中的默认部署。

    az ml compute update --name cpu-cluster --identity-type user_assigned --user-assigned-identities $IDENTITY
    
  3. Azure 门户中,确保托管标识具有读取数据的正确权限。

    若要访问存储服务,必须至少具有存储帐户存储 Blob 数据读取者访问权限。 只有存储帐户所有者可以通过 Azure 门户更改访问级别

创建逻辑应用

  1. Azure 门户中的 Azure 主页上,选择“创建资源”

  2. 在“Azure 市场”菜单上,选择“集成”>“逻辑应用”。

    屏幕截图显示了“Azure 市场”菜单,其中选定了“集成”和“逻辑应用”。

  3. 在“创建逻辑应用”窗格中的“基本信息”选项卡上,提供有关你的逻辑应用资源的以下信息。

    properties 需要 说明
    订阅 <Azure-subscription-name> Azure 订阅名称。 此示例使用“即用即付”。
    资源组 LA-TravelTime-RG 你在其中创建逻辑应用资源和相关资源的 Azure 资源组。 此名称在各个区域中必须唯一,并且只能包含字母、数字、连字符 (-)、下划线 (_)、括号(())和句点 (.)。
    名称 LA-TravelTime 逻辑应用资源名称,在各个区域中必须唯一,并且只能包含字母、数字、连字符 (-)、下划线 (_)、括号(())和句点 (.)。
  4. 在继续做出选择之前,请转到“计划”部分。 对于“计划类型”,请选择“消耗”,以便仅显示在多租户 Azure 逻辑应用中运行的消耗逻辑应用工作流的设置

    重要

    对于启用了专用链接的工作区,需要将Azure 逻辑应用的标准计划与“允许专用网络”配置结合使用。

    “计划类型”属性还指定要使用的计费模型。

    计划类型 说明
    标准 此逻辑应用类型是默认选择,在单租户 Azure 逻辑应用中运行,并使用标准定价模型
    消耗 此逻辑应用类型在全局多租户 Azure 逻辑应用中运行,并使用消耗定价模型
  5. 现在继续进行以下选择:

    属性 需要 说明
    区域 中国北部 3 用于存储应用信息的 Azure 数据中心区域。 此示例将示例逻辑应用部署到 Azure 的“中国北部”区域。

    注意:如果你的订阅与集成服务环境相关联,则此列表包括这些环境。
    启用日志分析 此选项仅在选择“消耗”逻辑应用类型时出现。 只在要启用诊断日志记录时,才更改此选项。 对于本教程,请保留默认选择。
  6. 完成操作后,选择“查看 + 创建”。 在 Azure 验证你的逻辑应用资源的相关信息后,请选择“创建”。

  7. 在 Azure 部署应用后,选择“转到资源”。

    Azure 将打开逻辑应用的工作流设计器。

配置工作流参数

此逻辑应用工作流使用参数来存储运行批处理部署所需的特定信息。

  1. 在工作流设计器工具栏上选择“参数”

    屏幕截图显示用于定义工作流所需参数的“参数”窗格。

  2. 在“参数”窗格中,选择“创建参数”,并提供有关要创建的每个参数的以下信息

    提示

    使用针对批处理终结点进行身份验证中配置的值。

    参数名称 说明 示例值
    tenant_id 部署终结点的租户 ID。 00000000-0000-0000-00000000
    client_id 用于调用终结点的服务主体的客户端 ID。 00000000-0000-0000-00000000
    client_secret 用于调用终结点的服务主体的客户端密码。 ABCDEFGhijkLMNOPQRstUVwz
    endpoint_uri 终结点评分 URI。

    重要说明:此 URI 适用于要执行的终结点。 终结点必须配置默认部署。
    https://<endpoint_name>.<region>.inference.studio.ml.azure.cn/jobs

    以下示例演示了示例参数:

    屏幕截图显示如何在设计器中添加一个参数。

    有关详细信息,请参阅创建要用于 Azure 逻辑应用工作流输入的跨环境参数

添加触发器

我们希望每次在存储帐户的特定文件夹(数据资产)中创建新文件时触发逻辑应用工作流。 逻辑应用使用事件的信息来调用批处理终结点并传递要处理的特定文件。

  1. 在工作流设计器中,按照以下常规步骤添加名为“发生资源事件时”的事件网格触发器。

  2. 在连接信息框中,选择要使用的身份验证类型,然后选择“登录”

  3. 在触发器框中,请提供以下信息:

    属性 价值 说明
    资源类型 Microsoft.Storage.StorageAccounts 发出事件的资源类型。
    订阅 订阅名称 存储帐户的订阅。
    资源名称 你的存储帐户名称 在其中生成文件的存储帐户的名称。
    事件类型项 Microsoft.Storage.BlobCreated 事件类型。
  4. 从“高级参数”列表中选择“前缀筛选器”,并提供以下值

    /blobServices/default/containers/<container-name>/blobs/<path-to-data-folder>

    重要

    “前缀筛选器”属性允许事件网格仅在我们指定的特定路径中创建 blob 时通知工作流。 在这种情况下,我们假设文件将由所选存储帐户中 <container-name> 容器内的 <path-to-data-folder> 文件夹中的某个外部进程创建。 配置此参数以匹配数据的位置。 否则,将为在存储帐户的任何位置创建的任何文件触发该事件。 有关详细信息,请参阅事件网格的事件筛选

    以下示例演示了触发器的显示方式:

    逻辑应用的触发器活动的屏幕截图。

配置操作

  1. 在“发生资源事件时”触发器下,按照以下常规步骤添加 HTTP 操作。 将操作重命名为“授权”

  2. 在“授权”操作中提供以下信息

    属性 说明
    方法 POST HTTP 方法
    URI concat('https://login.partner.microsoftonline.cn/', parameters('tenant_id'), '/oauth2/token') 若要输入此表达式,请在“URI”框中选择。 从显示的选项中选择表达式编辑器(公式图标)。
    标头 值为 application/x-www-form-urlencodedContent-Type
    正文 concat('grant_type=client_credentials&client_id=', parameters('client_id'), '&client_secret=', parameters('client_secret'), '&resource=https://studio.ml.azure.cn') 若要输入此表达式,请在“正文”框中选择。 从显示的选项中选择表达式编辑器(公式图标)。

    以下示例演示了一个示例“授权”操作

    屏幕截图显示逻辑应用工作流中的“授权”操作示例。

  3. 在“授权”操作下,添加另一个 HTTP 操作,并将标题重命名为“调用”

  4. 在“调用”操作中提供以下信息

    属性 说明
    方法 POST HTTP 方法
    URI endpoint_uri 在“URI”框中选择,然后在“参数”下,选择“endpoint_uri”
    标头 值为 application/jsonContent-Type
    标头 值为 concat('Bearer ', body('Authorize')['access_token'])Authorization 若要输入此表达式,请在“标头”框中选择。 从显示的选项中选择表达式编辑器(公式图标)。
  5. 在“正文”框中选择,然后从显示的选项中选择表达式编辑器(公式图标)以输入以下表达式

    replace('{
     "properties": {
       "InputData": {
         "mnistinput": {
            "JobInputType" : "UriFile",
            "Uri" : "<JOB_INPUT_URI>"
         }
       }
      }
    }', '<JOB_INPUT_URI>', triggerBody()?[0]['data']['url'])
    

    提示

    以前的有效负载对应于模型部署。 如果使用管道组件部署,请根据管道输入的预期调整格式。 有关如何在 REST 调用中构建输入的详细信息,请参阅为批处理终结点创建作业和输入数据 (REST)

    以下示例演示了一个示例“调用”操作

    屏幕截图显示逻辑应用工作流中的“调用”操作示例。

    注意

    “调用”操作会触发批处理作业,但该操作不会等待作业完成。 默认情况下,Azure 逻辑应用不是为长时间运行的应用程序设置的。 如果你需要等待作业完成,建议切换到从 Azure 数据工厂运行批处理终结点

  6. 完成后,保存工作流。

    逻辑应用工作流已准备好执行,每次在指定路径下创建新文件时都会自动触发。

  7. 若要确认应用已成功收到事件,请查看应用的“运行历史记录”

    屏幕截图显示逻辑应用工作流的运行历史记录。

后续步骤