教程:使用事件网格和 Azure Functions 将捕获的事件中心数据迁移到 Azure Synapse Analytics
Azure 事件中心捕获用于自动捕获 Azure Blob 存储或 Azure Data Lake Storage 中事件中心的流式处理数据。 本教程介绍如何使用事件网格触发的 Azure 函数将捕获的事件中心数据从存储迁移到 Azure Synapse Analytics。
此图描绘了在本教程中生成的解决方案的工作流:
- 在 Azure Blob 存储中捕获发送到 Azure 事件中心的数据。
- 完成数据捕获后,将生成一个事件并将其发送到 Azure 事件网格。
- Azure 事件网格将此事件数据转发到 Azure 函数应用。
- 函数应用使用事件数据中的 Blob URL 从存储中检索 Blob。
- 函数应用将 Blob 数据迁移到 Azure Synapse Analytics。
在本文中,将执行以下步骤:
- 部署本教程所需的基础结构
- 将代码发布到 Functions 应用
- 创建事件网格订阅
- 将示例数据流式传输到事件中心
- 验证 Azure Synapse Analytics 中捕获的数据
若要完成本教程,必须满足以下先决条件:
- Azure 订阅。 如果没有 Azure 订阅,可在开始前创建一个试用帐户。
- Visual Studio,包含适用于以下用途的工作负载:.NET 桌面开发、Azure 开发、ASP.NET 和 Web 开发、Node.js 开发和 Python 开发。
- 将 EventHubsCaptureEventGridDemo 示例项目下载到计算机上。
- WindTurbineDataGenerator - 一个简单的发布服务器,可以将示例性的风力涡轮机数据发送到启用了捕获功能的事件中心
- FunctionDWDumper - 一个 Azure 函数,可以在 Avro 文件被捕获到 Azure 存储 Blob 时从 Azure 事件网格接收通知。 它接收 Blob 的 URI 路径、读取其内容并将该数据推送到 Azure Synapse Analytics(专用 SQL 池)。
在此步骤中,使用资源管理器模板部署所需的基础结构。 部署模板时,将创建以下资源:
- 已启用捕获功能的事件中心。
- 适用于已捕获文件的存储帐户。
- 用于托管函数应用的应用服务计划
- 用于处理事件的函数应用
- 用于托管数据仓库的 SQL Server
- 用于存储迁移数据的 Azure Synapse Analytics(专用 SQL 池)
- 通过运行以下 CLI 命令创建 Azure 资源组:
将以下命令复制并粘贴到 CLI 窗口。 如有需要,请更改资源组名称和位置。
az group create -l chinaeast2 -n rgDataMigration
按 Enter。
下面是一个示例:
user@Azure:~$ az group create -l chinaeast2 -n rgDataMigration { "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration", "location": "chinaeast2", "managedBy": null, "name": "rgDataMigration", "properties": { "provisioningState": "Succeeded" }, "tags": null }
- 通过运行以下 CLI 命令来部署上一部分提到的所有资源(事件中心、存储帐户、函数应用、Azure Synapse Analytics):
将命令复制并粘贴到 CLI 窗口中。 也可能需要将内容复制/粘贴到所选的编辑器中,设置值,然后将该命令复制到 CLI。
重要
运行此命令前,指定以下实体的值:
- 之前创建的资源组的名称。
- 事件中心命名空间的名称。
- 事件中心的名称。 可以将值保留原样 (hubdatamigration)。
- SQL Server 的名称。
- SQL 用户名称和密码。
- 数据库的名称。
- 存储帐户的名称。
- 函数应用的名称。
az deployment group create \ --resource-group rgDataMigration \ --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \ --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
在 CLI 窗口中按 ENTER 以运行该命令。 此过程可能需要一段时间,因为正在创建一系列资源。 在命令的结果中,请确保没有任何故障。
在本部分,我们将在前面创建的专用 SQL 池中创建一个表。
在资源组的资源列表中,选择“专用 SQL 池”。
在“专用 SQL 池”页左侧菜单的“常见任务”部分中,选择“查询编辑器(预览版)” 。
输入 SQL Server 的“用户名”和“密码”,然后选择“确定” 。 如果看到有关允许客户端访问 SQL 服务器的消息,请在服务器<SQL 服务器>上选择“将 IP <你的 IP 地址>加入允许列表”,然后选择“确定”。
在查询窗口中,复制并运行以下 SQL 脚本:
CREATE TABLE [dbo].[Fact_WindTurbineMetrics] ( [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, [MeasureTime] datetime NULL, [GeneratedPower] float NULL, [WindSpeed] float NULL, [TurbineSpeed] float NULL ) WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
保持此选项卡或窗口处于打开状态,以便可以验证在本教程结束时是否创建了数据。
首先,从 Azure 门户获取 Functions 应用的发布配置文件。 然后,使用发布配置文件从 Visual Studio 发布 Azure Functions 项目或应用。
在“资源组”页上,选择资源列表中的“Azure Functions 应用”。
在应用的“函数应用”页上,选择命令栏上的“获取发布配置文件”。
下载文件并将其保存到 EventHubsCaptureEventGridDemo 文件夹的 FunctionEGDDumper 子文件夹中。
启动 Visual Studio。
打开作为先决条件的一部分从 GitHub 下载的 EventHubsCaptureEventGridDemo.sln 解决方案。 可以在
/samples/e2e/EventHubsCaptureEventGridDemo
文件夹中找到它。在解决方案资源管理器中,右键单击“FunctionEGDWDumper”项目,然后选择“发布” 。
在以下屏幕中,选择“开始”或“添加发布配置文件”。
在“发布”对话框中,对于“目标”,请选择“导入配置文件”,然后选择“下一步”。
在“导入配置文件”选项卡上,选择前面保存在 FunctionEGDWDumper 文件夹中的发布设置文件,然后选择“完成”。
在 Visual Studio 配置好配置文件后,选择“发布”。 确认发布成功。
在打开了“Azure 函数”页面的 Web 浏览器中,选择左侧菜单中的“函数”。 确认 EventGridTriggerMigrateData 函数显示在列表中。 如果看不到该函数,请尝试再次从 Visual Studio 发布,然后在门户中刷新页面。
在发布函数后,已准备好订阅事件。
在 Web 浏览器的新选项卡或新窗口中,导航到 Azure 门户。
在 Azure 门户中的左侧菜单上选择“资源组”。
通过在搜索框中输入资源组的名称来筛选资源组列表。
在列表中选择你的资源组。
从资源列表中选择“事件中心命名空间”。
在“事件中心命名空间”页的左侧菜单中选择“事件”,然后在工具栏上选择“+ 事件订阅” 。
在“创建事件订阅”页上执行以下步骤:
输入事件订阅的名称。
输入系统主题的名称。 系统主题为发送方提供发送事件的终结点。 有关详细信息,请参阅系统主题
对于“终结点类型”,请选择“Azure Function” 。
对于“终结点”,请选择该链接。
在“选择 Azure Function”页上,如果以下步骤没有自动填充,请执行这些步骤。
- 选择包含 Azure Function 的 Azure 订阅。
- 为函数选择资源组。
- 选择函数应用。
- 选择部署槽位。
- 选择 EventGridTriggerMigrateData 函数。
在“选择 Azure Function”页上,选择”确认选择” 。
然后返回“创建事件订阅”页,选择“创建” 。
验证是否已创建事件订阅。 切换到事件中心命名空间的“事件”页上的“事件订阅”选项卡 。
在资源组的资源列表中,选择应用服务计划(而不是应用服务)。
至此,已完成设置事件中心、专用 SQL 池(以前称为 SQL 数据仓库)、Azure 函数应用和事件订阅。 需要先配置几个值,然后再运行应用来生成事件中心数据。
在 Azure 门户中,像之前那样导航到资源组。
选择事件中心命名空间。
在“事件中心命名空间”页中的左侧菜单上选择“共享访问策略” 。
在策略列表中选择 RootManageSharedAccessKey。
选择“连接字符串 - 主密钥”文本框旁边的“复制”按钮。
返回到 Visual Studio 解决方案。
右键单击“WindTurbineDataGenerator”项目,然后选择“设为启动项目” 。
在 WindTurbineDataGenerator 项目中,打开 program.cs。
将
<EVENT HUBS NAMESPACE CONNECTION STRING>
替换为从门户复制的连接字符串。如果为事件中心使用了不同于
hubdatamigration
的名称,请将<EVENT HUB NAME>
替换为事件中心的名称。private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.chinacloudapi.cn/..."; private const string EventHubName = "hubdatamigration";
生成解决方案。 运行 WindTurbineGenerator.exe 应用程序。
几分钟后,在打开了查询窗口的另一个浏览器选项卡中,查询数据仓库中的表以获取已迁移的数据。
select * from [dbo].[Fact_WindTurbineMetrics]
本部分可帮助你监视解决方案或对解决方案进行故障排除。
导航到资源组,然后选择用于捕获事件数据的存储帐户。
在“存储帐户”页的左侧菜单中选择“存储资源管理器(预览)” 。
展开“BLOB 容器”,并选择“windturbinecapture” 。
在右侧窗格中打开与事件中心命名空间名称相同的文件夹。
打开与事件中心名称相同的文件夹 (hubdatamigration)。
钻取文件夹,你将看到 AVRO 文件。 下面是一个示例:
导航到资源组,然后选择函数应用。
在左侧菜单中选择“函数”。
从列表中选择 EventGridTriggerMigrateData 函数。
在“函数”页的左侧菜单中选择“监视器” 。
选择“配置”,配置 Application Insights 来捕获调用日志。
新建一个 Application Insights 资源或使用现有资源。
导航回到该函数的“监视器”页。
确认发送事件的客户端应用程序 (WindTurbineDataGenerator) 仍在运行。 如果没有,请运行应用。
等待几分钟(5 分钟或更长时间),然后选择“刷新”按钮以查看函数调用。
选择一个调用以查看详细信息。
事件网格将事件数据分发给订阅者。 以下示例显示了在 Blob 中捕获通过事件中心的数据流时生成的事件数据。 特别要注意
data
对象中的fileUrl
属性指向存储中的 Blob。 函数应用使用此 URL 来检索具有捕获数据的 Blob 文件。{ "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207", "subject": "hubdatamigration", "eventType": "Microsoft.EventHub.CaptureFileCreated", "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba", "data": { "fileUrl": "https://spehubstorage1207.blob.core.chinacloudapi.cn/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro", "fileType": "AzureBlockBlob", "partitionId": "0", "sizeInBytes": 473444, "eventCount": 2800, "firstSequenceNumber": 55500, "lastSequenceNumber": 58299, "firstEnqueueTime": "2020-12-07T21:49:12.556Z", "lastEnqueueTime": "2020-12-07T21:50:11.534Z" }, "dataVersion": "1", "metadataVersion": "1", "eventTime": "2020-12-07T21:50:12.7065524Z" }
在打开了查询窗口的浏览器选项卡中,查询专用 SQL 池中的表以获取已迁移的数据。
可以将强大的数据可视化工具与数据仓库配合使用,以便获取可行的见解。