从存储中的事件网格事件运行批处理终结点
适用范围:Azure CLI ml 扩展 v2(最新版)Python SDK azure-ai-ml v2(最新版)
事件网格是一项全面托管的服务,可用于在许多不同的 Azure 服务和应用程序中轻松管理事件。 它简化了生成事件驱动的应用程序和无服务器应用程序的过程。 本教程介绍如何在存储帐户中创建文件后立即触发批处理终结点的作业来处理这些文件。 在此体系结构中,我们使用逻辑应用来订阅这些事件并触发终结点。
工作流如下所示:
在特定存储帐户中创建新的 Blob 时,会触发“创建的文件”事件。
该事件会发送到事件网格,经过处理后发送给所有订阅者。
订阅了逻辑应用来侦听这些事件。 由于存储帐户可以包含多个数据资产,因此将应用事件筛选以仅对存储帐户内特定文件夹中发生的事件做出反应。 如果需要,可进一步筛选(例如,根据文件扩展名)。
会触发逻辑应用,它转而将执行以下操作:
它将获得一个授权令牌,使用来自服务主体的凭据调用批处理终结点
工作流将使用新创建的文件作为输入触发批处理终结点(默认部署)。
批处理终结点将返回为处理文件而创建的作业的名称。
重要
使用与事件网格连接的逻辑应用调用批处理终结点时,将为在存储帐户中创建的每个 Blob 文件创建一个作业。 请记住,由于批处理终结点在文件级别分发工作,因此不会发生任何并行化。 相反,你将利用在同一计算群集下执行多个作业的批处理终结点功能。 如果需要以自动方式在整个文件夹上运行作业,我们建议切换到从 Azure 数据工厂调用批处理终结点。
先决条件
- 此示例假定你已将模型正确部署为批处理终结点。
- 此示例假定你的批处理部署在名为
batch-cluster
的计算群集中运行。 - 我们正在创建的逻辑应用将使用 REST 与 Azure 机器学习批处理终结点进行通信。 要详细了解如何使用批处理终结点的 REST API,请阅读为批处理终结点创建作业和输入数据。
针对批处理终结点进行身份验证
Azure 逻辑应用可以使用 HTTP 活动调用批处理终结点的 REST API。 批处理终结点支持 Microsoft Entra ID 进行授权,因此对 API 发出的请求需要适当的身份验证处理。
在这种情况下,我们建议使用服务主体进行身份验证以及与批处理终结点交互。
按照向 Microsoft Entra ID 注册应用程序并创建服务主体中的步骤创建服务主体。
按照选项 3:创建新的客户端机密中的说明创建用于身份验证的机密。
记下生成的客户端机密值。 此内容只显示一次。
记下应用程序“概述”窗格中的
client ID
和tenant id
。为创建的服务主体授予对工作区的访问权限,如授予访问权限中所述。 在本例中,服务主体将需要:
- 工作区中读取批处理部署并对其执行操作的权限。
- 在数据存储中读取/写入的权限。
启用数据访问
我们将使用事件网格提供的云 URI 来指示要发送到部署作业的输入数据。 批处理终结点使用计算的标识来装载数据,同时保留作业的标识,以便在装载作业后进行读取。 因此,需要将用户分配的托管标识分配给计算群集,以确保其有权装载基础数据。 请按照以下步骤进行操作以确保数据访问:
创建托管标识资源:
更新计算群集以使用我们创建的托管标识:
注意
此示例假设你创建了一个名为
cpu-cluster
的计算群集,并将其用于终结点中的默认部署。转到 Azure 门户并确保托管标识具有读取数据的正确权限。 若要访问存储服务,必须至少具有存储帐户存储 Blob 数据读取者访问权限。 只有存储帐户所有者可以通过 Azure 门户更改访问级别。
创建逻辑应用
在 Azure 门户中,使用 Azure 帐户登录。
在 Azure 主页上,选择“创建资源”。
在“Azure 市场”菜单上,选择“集成”>“逻辑应用”。
在“创建逻辑应用”窗格中的“基本信息”选项卡上,提供有关你的逻辑应用资源的以下信息。
properties 需要 值 说明 订阅 是 <Azure-subscription-name> Azure 订阅名称。 此示例使用“即用即付”。 资源组 是 LA-TravelTime-RG 你在其中创建逻辑应用资源和相关资源的 Azure 资源组。 此名称在各个区域中必须唯一,并且只能包含字母、数字、连字符 ( -
)、下划线 (_
)、括号((
、)
)和句点 (.
)。名称 是 LA-TravelTime 逻辑应用资源名称,在各个区域中必须唯一,并且只能包含字母、数字、连字符 ( -
)、下划线 (_
)、括号((
、)
)和句点 (.
)。在继续做出选择之前,请转到“计划”部分。 对于“计划类型”,请选择“消耗”,以便仅显示在多租户 Azure 逻辑应用中运行的消耗逻辑应用工作流的设置。
“计划类型”属性还指定要使用的计费模型。
计划类型 说明 标准 此逻辑应用类型是默认选择,在单租户 Azure 逻辑应用中运行,并使用标准计费模型。 消耗 此逻辑应用类型在全局多租户 Azure 逻辑应用中运行,并使用消耗计费模型。 重要
对于启用了专用链接的工作区,需要将逻辑应用的标准计划与“允许专用网络”配置结合使用。
现在继续进行以下选择:
属性 需要 值 说明 区域 是 中国北部 用于存储应用信息的 Azure 数据中心区域。 此示例将示例逻辑应用部署到 Azure 的“中国北部”区域。
注意:如果你的订阅与集成服务环境相关联,则此列表包括这些环境。启用日志分析 是 否 此选项仅在选择“消耗”逻辑应用类型时出现。 只在要启用诊断日志记录时,才更改此选项。 对于本教程,请保留默认选择。 完成操作后,选择“查看 + 创建”。 在 Azure 验证你的逻辑应用资源的相关信息后,请选择“创建”。
在 Azure 部署应用后,选择“转到资源”。
Azure 会打开工作流模板选择窗格,其中显示了简介视频、常用触发器和工作流模板模式。
向下滚动,越过视频和常用触发器部分,找到“模板”部分,然后选择“空白逻辑应用”。
配置工作流参数
此逻辑应用使用参数来存储运行批处理部署所需的特定信息。
在工作流设计器的工具栏下,选择“参数”选项并进行如下配置:
要创建参数,请使用“添加参数”选项:
创建以下参数。
参数 说明 示例值 tenant_id
部署终结点的租户 ID。 00000000-0000-0000-00000000
client_id
用于调用终结点的服务主体的客户端 ID。 00000000-0000-0000-00000000
client_secret
用于调用终结点的服务主体的客户端密码。 ABCDEFGhijkLMNOPQRstUVwz
endpoint_uri
终结点评分 URI。 https://<endpoint_name>.<region>.inference.studio.ml.azure.cn/jobs
重要
endpoint_uri
是你尝试执行的终结点的 URI。 终结点必须配置默认部署。提示
使用针对批处理终结点进行身份验证中配置的值。
添加触发器
我们希望每次在存储帐户的给定文件夹(数据资产)中创建新文件时触发逻辑应用。 逻辑应用使用事件的信息来调用批处理终结点并传递要处理的特定文件。
在工作流设计器中的搜索框下,选择“内置”。
在搜索框中,输入“事件网格”,然后选择名为“资源事件发生时”的触发器。
按如下所示配置触发器:
属性 价值 说明 订阅 订阅名称 包含 Azure 存储帐户的订阅。 资源类型 Microsoft.Storage.StorageAccounts
发出事件的资源类型。 资源名称 你的存储帐户名称 将在其中生成文件的存储帐户的名称。 事件类型项 Microsoft.Storage.BlobCreated
事件类型。 单击“添加新参数”,然后选择“前缀筛选器”。 添加值
/blobServices/default/containers/<container_name>/blobs/<path_to_data_folder>
。重要
前缀筛选器允许事件网格仅在我们指定的特定路径中创建 blob 时通知工作流。 在这种情况下,我们假设文件将由所选存储帐户中
<container_name>
容器内的<path_to_data_folder>
文件夹中的某个外部进程创建。 配置此参数以匹配数据的位置。 否则,将为在存储帐户的任何位置创建的任何文件触发该事件。 有关更多详细信息,请参阅事件网格的事件筛选。触发器将如下所示:
配置操作
单击“+ 新建步骤”。
在工作流设计器的搜索框下,选择“内置”,然后单击“HTTP”:
按如下所示配置操作:
属性 值 说明 方法 POST
HTTP 方法 URI concat('https://login.partner.microsoftonline.cn/', parameters('tenant_id'), '/oauth2/token')
单击“添加动态上下文”,然后单击“表达式”以输入此表达式。 标头 值为 application/x-www-form-urlencoded
的Content-Type
正文 concat('grant_type=client_credentials&client_id=', parameters('client_id'), '&client_secret=', parameters('client_secret'), '&resource=https://studio.ml.azure.cn')
单击“添加动态上下文”,然后单击“表达式”以输入此表达式。 该操作将如下所示:
单击“+ 新建步骤”。
在工作流设计器的搜索框下,选择“内置”,然后单击“HTTP”:
按如下所示配置操作:
属性 值 说明 方法 POST
HTTP 方法 URI endpoint_uri
单击“添加动态上下文”,然后在 parameters
下选择此操作。标头 值为 application/json
的Content-Type
标头 值为 concat('Bearer ', body('Authorize')['access_token'])
的Authorization
单击“添加动态上下文”,然后单击“表达式”以输入此表达式。 在参数的“正文”中,单击“添加动态上下文”,然后单击“表达式”以输入以下表达式:
replace('{ "properties": { "InputData": { "mnistinput": { "JobInputType" : "UriFile", "Uri" : "<JOB_INPUT_URI>" } } } }', '<JOB_INPUT_URI>', triggerBody()?[0]['data']['url'])
提示
以前的有效负载对应于模型部署。 如果使用管道组件部署,请根据管道输入的预期调整格式。 若要详细了解如何在 REST 调用中构建输入,请参阅为批处理终结点创建作业和输入数据 (REST)。
该操作将如下所示:
注意
请注意,最后这个操作将触发批处理作业,但不会等待作业完成。 Azure 逻辑应用不是为长时间运行的应用程序设计的。 如果你需要等待作业完成,建议切换到从 Azure 数据工厂运行批处理终结点。
单击“保存” 。
逻辑应用已准备好执行,每次在指定路径下创建新文件时都会自动触发。 通过检查逻辑应用的“运行历史记录”,你会注意到应用已成功接收事件: