在本教程中,将事件中心捕获的数据从 Azure Blob 存储迁移到 Azure Synapse Analytics。 使用 Azure 事件网格和 Azure Functions 将日期迁移到专用 SQL 池。
此图描绘了在本教程中生成的解决方案的工作流:
- 在 Azure Blob 存储中捕获发送到 Azure 事件中心的数据。
- 完成数据捕获后,将生成一个事件并将其发送到 Azure 事件网格。
- Azure 事件网格将此事件数据转发到 Azure 函数应用。
- 函数应用使用事件数据中的 Blob URL 从存储中检索 Blob。
- 函数应用将 Blob 数据迁移到 Azure Synapse Analytics。
在本文中,将执行以下步骤:
- 部署本教程所需的基础结构
- 将代码发布到 Functions 应用
- 创建事件网格订阅
- 将示例数据流式传输到事件中心
- 验证 Azure Synapse Analytics 中捕获的数据
先决条件
了解 Azure 事件网格和 Azure 事件中心,尤其是捕获功能。 如果你并不熟悉 Azure 事件网格,请参阅 Azure 事件网格简介。 有关 Azure 事件中心捕获功能的详细信息,请参阅通过 Azure Blob 存储或 Azure Data Lake Storage 中的 Azure 事件中心来捕获事件。
Azure 订阅。 如果没有 Azure 订阅,可在开始前创建一个试用帐户。
Visual Studio 具有用于 .NET 桌面开发、Azure 开发、ASP.NET 和 Web 开发、Node.js 开发和 Python 开发的工作负载。
EventHubsCaptureEventGridDemo 示例项目。 该示例包括:
- WindTurbineDataGenerator。 一个简单的发布者,它将示例风力涡轮机数据发送到启用了捕获功能的事件中心。
- FunctionDWDumper。 一个 Azure 函数,当 Avro 文件捕获到 Azure 存储 Blob 时,该函数从 Azure 事件网格接收通知。 它接收 Blob URI,读取其内容,并将此数据推送到 Azure Synapse Analytics。
部署基础结构
在本部分中,使用 资源管理器模板部署所需的基础结构。 部署模板时,将创建以下资源:
- 启用了捕获功能的事件中心
- 捕获文件的存储帐户
- 用于托管函数应用的应用服务计划
- 用于处理事件的函数应用
- 用于托管数据仓库的 SQL Server
- 用于存储迁移数据的 Azure Synapse Analytics(专用 SQL 池)
使用 Azure CLI 部署基础结构
- 登录 Azure 门户。
通过运行以下 CLI 命令创建 Azure 资源组:
- 将以下命令复制并粘贴到 Azure CLI 窗口中。 如有需要,请更改资源组名称和位置。
az group create --name rgDataMigration --location chinanorth
按 Enter。
下面是一个示例:
user@Azure:~$ az group create --name rgDataMigration --location chinanorth { "id": "/subscriptions/aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e/resourceGroups/rgDataMigration", "location": "chinanorth", "managedBy": null, "name": "rgDataMigration", "properties": { "provisioningState": "Succeeded" }, "tags": null, "type": "Microsoft.Resources/resourceGroups" }
通过运行以下 CLI 命令部署上一节中提到的所有资源。 资源包括事件中心、存储帐户、函数应用和 Azure Synapse Analytics。
将命令复制并粘贴到 Azure CLI 窗口中。 如果由于 Azure 资源名称而出现错误,请删除资源组,修复名称,然后重试该命令。
重要
运行命令之前,请指定以下实体的值:
- 之前创建的资源组的名称。
- 事件中心命名空间的名称。
- 事件中心的名称。 可以将值保留原样 (hubdatamigration)。
- SQL Server 的名称。
- SQL 用户名称和密码。
- 数据库的名称。
- 存储帐户的名称。
- 函数应用的名称。
az deployment group create \ --resource-group rgDataMigration \ --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \ --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
此过程可能需要一段时间,因为你要创建一系列资源。 在命令的结果中,确保没有出错。
验证是否已创建资源
在 Azure Synapse Analytics 中创建表
在本部分,我们将在前面创建的专用 SQL 池中创建一个表。
在资源组的资源列表中,选择“专用 SQL 池”。
在“专用 SQL 池”页上的“常见任务”下,选择“查询编辑器”(预览版)。
输入 SQL Server 的用户名和密码,然后选择“ 确定”。 如果看到有关允许客户端访问 SQL 服务器的消息,请在服务器<SQL 服务器>上选择“将 IP <你的 IP 地址>加入允许列表”,然后选择“确定”。
在查询窗口中,复制并运行以下 SQL 脚本:
CREATE TABLE [dbo].[Fact_WindTurbineMetrics] ( [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, [MeasureTime] datetime NULL, [GeneratedPower] float NULL, [WindSpeed] float NULL, [TurbineSpeed] float NULL ) WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
保持此选项卡或窗口处于打开状态,以便可以验证在本教程结束时是否创建了数据。
发布 Azure Functions 应用
首先,从 Azure 门户获取 Functions 应用的发布配置文件。 然后,使用发布配置文件从 Visual Studio 发布 Azure Functions 项目或应用。
获取发布配置文件
在 “资源组 ”页上,选择 Azure Functions 应用。
在应用的 “功能应用” 页上,选择“获取发布配置文件”。
下载文件并将其保存到 EventHubsCaptureEventGridDemo 文件夹的 FunctionEGDDumper 子文件夹中。
使用发布配置文件发布 Functions 应用
启动 Visual Studio。
打开作为先决条件的一部分从 GitHub 下载的 EventHubsCaptureEventGridDemo.sln 解决方案。 可以在
/samples/e2e/EventHubsCaptureEventGridDemo
文件夹中找到它。在解决方案资源管理器中,右键单击“FunctionEGDWDumper”项目,然后选择“发布” 。
在以下屏幕中,选择“开始”或“添加发布配置文件”。
在“发布”对话框中,对于“目标”,请选择“导入配置文件”,然后选择“下一步”。
在“ 导入配置文件 ”选项卡上,选择之前在 FunctionEGDWDumper 文件夹中保存的发布设置文件。 然后选择“完成”。
当 Visual Studio 设置配置文件时,选择发布。 确认发布成功。
在打开了“Azure 函数”页面的 Web 浏览器中,在中间窗格中选择“函数”。 确认 EventGridTriggerMigrateData 函数显示在列表中。 如果看不到该函数,请尝试再次从 Visual Studio 发布,然后在门户中刷新页面。
在发布函数后,已准备好订阅事件。
订阅事件
在 Web 浏览器的新选项卡或新窗口中,登录 Azure 门户。
在 Azure 门户中,搜索并选择“资源组”。
如有必要,请在搜索框中输入资源组的名称以筛选资源组。
在列表中选择你的资源组。
从资源列表中选择“事件中心命名空间”。
在 “事件中心命名空间 ”页上,选择“ 事件”,然后选择工具栏上的“ + 事件订阅 ”。
在“创建事件订阅”页上执行以下步骤:
输入事件订阅的名称。
输入系统主题的名称。 系统主题为发送方提供发送事件的终结点。 有关详细信息,请参阅 系统主题。
对于“终结点类型”,请选择“Azure Function” 。
对于“终结点”,请选择该链接。
在“选择 Azure Function”页上,如果以下步骤没有自动填充,请执行这些步骤。
- 选择包含 Azure Function 的 Azure 订阅。
- 为函数选择资源组。
- 选择函数应用。
- 选择部署槽位。
- 选择 EventGridTriggerMigrateData 函数。
在“选择 Azure Function”页上,选择”确认选择” 。
然后返回“创建事件订阅”页,选择“创建” 。
验证是否已创建事件订阅。 切换到事件中心命名空间的“事件”页上的“事件订阅”选项卡 。
运行应用以生成数据
你已完成事件中心设置、指定 SQL 池(以前为 SQL 数据仓库)、Azure 函数应用和事件订阅。 在运行为事件中心生成数据的应用程序之前,请配置一些值。
在 Azure 门户中,像之前那样导航到资源组。
选择事件中心命名空间。
在 “事件中心命名空间 ”页中,选择 “共享访问策略”。
在策略列表中选择 RootManageSharedAccessKey。
选择 主连接字符串旁边的复制按钮。
返回到 Visual Studio 解决方案。
右键单击“WindTurbineDataGenerator”项目,然后选择“设为启动项目” 。
在 WindTurbineDataGenerator 项目中,打开 program.cs。
将
<EVENT HUBS NAMESPACE CONNECTION STRING>
替换为从门户复制的连接字符串。如果为事件中心使用了其他名称,请将
<EVENT HUB NAME>
替换为事件中心的名称hubdatamigration
。private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.chinacloudapi.cn/..."; private const string EventHubName = "hubdatamigration";
生成解决方案。 运行 WindTurbineGenerator.exe 应用程序。
几分钟后,在打开了查询窗口的另一个浏览器选项卡中,查询数据仓库中的表以获取已迁移的数据。
select * from [dbo].[Fact_WindTurbineMetrics]
重要
在此示例中,使用连接字符串向 Azure 事件中心命名空间进行身份验证使教程保持简单。 建议在生产环境中使用 Microsoft Entra ID 身份验证。 使用应用程序时,为应用程序启用托管标识,并在事件中心命名空间上为标识分配适当的角色(Azure 事件中心所有者、Azure 事件中心数据发送方或 Azure 事件中心数据接收器)。 有关详细信息,请参阅使用 Microsoft Entra ID 授予对事件中心的访问权限。
监视解决方案
本部分可帮助你监视解决方案或对解决方案进行故障排除。
查看存储帐户中的捕获数据
导航到资源组,然后选择用于捕获事件数据的存储帐户。
在 “存储帐户 ”页上,选择 “存储浏览器”。
展开“BLOB 容器”,并选择“windturbinecapture” 。
在右侧窗格中打开与事件中心命名空间名称相同的文件夹。
打开与事件中心名称相同的文件夹 (hubdatamigration)。
钻取文件夹,你将看到 AVRO 文件。 下面是一个示例:
验证事件网格触发器是否调用了函数
导航到资源组,然后选择函数应用。
在中间窗格中选择“函数”选项卡。
从列表中选择 EventGridTriggerMigrateData 函数。
在“函数”页的左侧菜单中选择“监视器” 。
选择“配置”,配置 Application Insights 来捕获调用日志。
新建一个 Application Insights 资源或使用现有资源。
导航回到该函数的“监视器”页。
确认发送事件的客户端应用程序 (WindTurbineDataGenerator) 仍在运行。 如果没有,请运行应用。
等待几分钟(5 分钟或更长时间),然后选择“刷新”按钮以查看函数调用。
若要查看详细信息,请选择调用。
事件网格将事件数据分发给订阅者。 以下示例显示了在 Blob 中捕获通过事件中心的数据流时生成的事件数据。 特别要注意
fileUrl
对象中的data
属性指向存储中的 Blob。 函数应用使用此 URL 来检索具有捕获数据的 Blob 文件。{ "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207", "subject": "hubdatamigration", "eventType": "Microsoft.EventHub.CaptureFileCreated", "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba", "data": { "fileUrl": "https://spehubstorage1207.blob.core.chinacloudapi.cn/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro", "fileType": "AzureBlockBlob", "partitionId": "0", "sizeInBytes": 473444, "eventCount": 2800, "firstSequenceNumber": 55500, "lastSequenceNumber": 58299, "firstEnqueueTime": "2020-12-07T21:49:12.556Z", "lastEnqueueTime": "2020-12-07T21:50:11.534Z" }, "dataVersion": "1", "metadataVersion": "1", "eventTime": "2020-12-07T21:50:12.7065524Z" }
验证数据是否存储在专用的 SQL 池中
在打开了查询窗口的浏览器选项卡中,查询专用 SQL 池中的表以获取已迁移的数据。
相关内容
- 若要详细了解如何设置和运行示例,请参阅事件中心捕获和事件网格示例。
- 在本教程中,你为
CaptureFileCreated
事件创建了一个事件订阅。 有关此事件以及 Azure Blob 存储支持的所有事件的详细信息,请参阅将 Azure 事件中心用作事件网格源。 - 有关事件中心捕获功能的详细信息,请参阅通过 Azure Blob 存储或 Azure Data Lake Storage 中的 Azure 事件中心来捕获事件。