教程:使用事件网格和 Azure Functions 将捕获的事件中心数据迁移到 Azure Synapse AnalyticsTutorial: Migrate captured Event Hubs data to Azure Synapse Analytics using Event Grid and Azure Functions

Azure 事件中心捕获用于自动捕获 Azure Blob 存储或 Azure Data Lake Storage 中事件中心的流式处理数据。Azure Event Hubs Capture enables you to automatically capture the streaming data in Event Hubs in an Azure Blob storage or Azure Data Lake Storage. 本教程介绍如何使用事件网格触发的 Azure 函数将捕获的事件中心数据从存储迁移到 Azure Synapse Analytics。This tutorial shows you how to migrate captured Event Hubs data from Storage to Azure Synapse Analytics by using an Azure function that's triggered by Event Grid.

应用概览

此图描绘了在本教程中生成的解决方案的工作流:This diagram depicts the workflow of the solution you build in this tutorial:

  1. 在 Azure Blob 存储中捕获发送到 Azure 事件中心的数据。Data sent to an Azure event hub is captured in an Azure blob storage.
  2. 完成数据捕获后,将生成一个事件并将其发送到 Azure 事件网格。When the data capture is complete, an event is generated and sent to an Azure event grid.
  3. 事件网格将此事件数据转发到 Azure 函数应用。The event grid forwards this event data to an Azure function app.
  4. 函数应用使用事件数据中的 Blob URL 从存储中检索 Blob。The function app uses the blob URL in the event data to retrieve the blob from the storage.
  5. 函数应用将 Blob 数据迁移到 Azure Synapse Analytics。The function app migrates the blob data to an Azure Synapse Analytics.

在本文中,将执行以下步骤:In this article, you take the following steps:

  • 部署本教程所需的基础结构Deploy the required infrastructure for the tutorial
  • 将代码发布到 Functions 应用Publish code to a Functions App
  • 创建事件网格订阅Create an Event Grid subscription
  • 将示例数据流式传输到事件中心Stream sample data into Event Hubs
  • 验证 Azure Synapse Analytics 中捕获的数据Verify captured data in Azure Synapse Analytics

先决条件Prerequisites

若要完成本教程,必须满足以下先决条件:To complete this tutorial, you must have:

  • Azure 订阅。An Azure subscription. 如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, create a Trial before you begin.
  • Visual Studio 2019,并包含适用于以下用途的工作负载:.NET 桌面开发、Azure 开发、ASP.NET 和 Web 开发、Node.js 开发和 Python 开发。Visual studio 2019 with workloads for: .NET desktop development, Azure development, ASP.NET and web development, Node.js development, and Python development.
  • EventHubsCaptureEventGridDemo 示例项目下载到计算机上。Download the EventHubsCaptureEventGridDemo sample project to your computer.
    • WindTurbineDataGenerator - 一个简单的发布服务器,可以将示例性的风力涡轮机数据发送到启用了捕获功能的事件中心WindTurbineDataGenerator - A simple publisher that sends sample wind turbine data to a capture-enabled event hub
    • FunctionDWDumper - 一个 Azure Function,可以在 Avro 文件捕获到 Azure 存储 Blob 时接收事件网格通知。FunctionDWDumper - An Azure Function that receives an Event Grid notification when an Avro file is captured to the Azure Storage blob. 它接收 Blob 的 URI 路径、读取其内容并将该数据推送到 Azure Synapse Analytics(专用 SQL 池)。It receives the blob’s URI path, reads its contents, and pushes this data to Azure Synapse Analytics (dedicated SQL pool).

部署基础结构Deploy the infrastructure

在此步骤中,使用资源管理器模板部署所需的基础结构。In this step, you deploy the required infrastructure with a Resource Manager template. 部署模板时,将创建以下资源:When you deploy the template, the following resources are created:

  • 已启用捕获功能的事件中心。Event hub with the Capture feature enabled.
  • 适用于已捕获文件的存储帐户。Storage account for the captured files.
  • 用于托管函数应用的应用服务计划App service plan for hosting the function app
  • 用于处理事件的函数应用Function app for processing the event
  • 用于托管数据仓库的 SQL ServerSQL Server for hosting the data warehouse
  • 用于存储迁移数据的 Azure Synapse Analytics(专用 SQL 池)Azure Synapse Analytics (dedicated SQL pool) for storing the migrated data

使用 Azure CLI 部署基础结构Use Azure CLI to deploy the infrastructure

  1. 通过运行以下 CLI 命令创建 Azure 资源组:Create an Azure resource group by running the following CLI command:
    1. 将以下命令复制并粘贴到 CLI 窗口。Copy and paste the following command into the CLI window. 如有需要,请更改资源组名称和位置。Change the resource group name and location if you want.

      az group create -l chinaeast2 -n rgDataMigration
      
    2. EnterPress ENTER.

      以下是示例:Here is an example:

      user@Azure:~$ az group create -l chinaeast2 -n rgDataMigration
      {
        "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration",
        "location": "chinaeast2",
        "managedBy": null,
        "name": "rgDataMigration",
        "properties": {
          "provisioningState": "Succeeded"
        },
        "tags": null
      }
      
  2. 通过运行以下 CLI 命令来部署上一部分提到的所有资源(事件中心、存储帐户、函数应用、Azure Synapse Analytics):Deploy all the resources mentioned in the previous section (event hub, storage account, functions app, Azure Synapse Analytics) by running the following CLI command:
    1. 将命令复制并粘贴到 CLI 窗口中。Copy and paste the command into the CLI window. 也可能需要将内容复制/粘贴到所选的编辑器中,设置值,然后将该命令复制到 CLI。Alternatively, you may want to copy/paste into an editor of your choice, set values, and then copy the command to the CLI.

      重要

      运行此命令前,指定以下实体的值:Specify values for the following entities before running the command:

      • 之前创建的资源组的名称。Name of the resource group you created earlier.
      • 事件中心命名空间的名称。Name for the event hub namespace.
      • 事件中心的名称。Name for the event hub. 可以将值保留原样 (hubdatamigration)。You can leave the value as it is (hubdatamigration).
      • SQL Server 的名称。Name for the SQL server.
      • SQL 用户名称和密码。Name of the SQL user and password.
      • 数据库的名称。Name for the database.
      • 存储帐户的名称。Name of the storage account.
      • 函数应用的名称。Name for the function app.
      az deployment group create \
          --resource-group rgDataMigration \
          --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \
          --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
      
    2. 在 CLI 窗口中按 ENTER 以运行该命令。Press ENTER in the CLI window to run the command. 此过程可能需要一段时间,因为正在创建一系列资源。This process may take a while since you are creating a bunch of resources. 在命令的结果中,请确保没有任何故障。In the result of the command, ensure that there have been no failures.

验证是否已创建资源Verify that the resources are created

  1. 在 Azure 门户中的左侧菜单上选择“资源组”。In the Azure portal, select Resource groups on the left menu.

  2. 通过在搜索框中输入资源组的名称来筛选资源组列表。Filter the list of resource groups by entering the name of your resource group in the search box.

  3. 在列表中选择你的资源组。Select your resource group in the list.

    选择你的资源组

  4. 确认是否在资源组中看到以下资源:Confirm that you see the following resources in the resource group:

    资源组中的资源

在 Azure Synapse Analytics 中创建表Create a table in Azure Synapse Analytics

通过运行 CreateDataWarehouseTable.sql 脚本在数据仓库中创建表。Create a table in your data warehouse by running the CreateDataWarehouseTable.sql script. 若要运行此脚本,可以使用 Visual Studio 或门户中的查询编辑器。To run the script, you can use Visual Studio or the Query Editor in the portal. 以下步骤显示如何使用查询编辑器:The following steps show you how to use the Query Editor:

  1. 在资源组的资源列表中,选择“专用 SQL 池”。In the list of resources in the resource group, select your dedicated SQL pool.

  2. 在“专用 SQL 池”页左侧菜单的“常见任务”部分中,选择“查询编辑器(预览版)” 。On the Dedicated SQL pool page, in the Common Tasks section on the left menu, select Query editor (preview).

    Azure Synapse Analytics 页

  3. 输入 SQL Server 的“用户名”和“密码”,然后选择“确定” 。Enter the name of user and password for the SQL server, and select OK. 如果看到有关允许客户端访问 SQL Server 的消息,请执行以下步骤:If you see a message about allowing your client to access the SQL server, follow these steps:

    1. 选择此链接:设置服务器防火墙Select the link: Set server firewall.
    2. 在“防火墙设置”页上,依次选择工具栏上的“添加客户端 IP”和“保存” 。On the Firewall settings page, select Add client IP on the toolbar, and then select Save on the toolbar.
    3. 在成功消息框上选择“确定”。Select OK on the success message.
    4. 导航回“专用 SQL 池”页,然后在左侧菜单中选择“查询编辑器(预览)” 。Navigate back to the Dedicated SQL pool page, and select Query editor (preview) on the left menu.
    5. 输入用户和密码,然后选择“确定” 。Enter user and password, and then select OK.
  4. 在查询窗口中,复制并运行以下 SQL 脚本:In the query window, copy and run the following SQL script:

    CREATE TABLE [dbo].[Fact_WindTurbineMetrics] (
        [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, 
        [MeasureTime] datetime NULL, 
        [GeneratedPower] float NULL, 
        [WindSpeed] float NULL, 
        [TurbineSpeed] float NULL
    )
    WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
    

    运行 SQL 查询

  5. 保持此选项卡或窗口处于打开状态,以便可以验证在本教程结束时是否创建了数据。Keep this tab or window open so that you can verify that the data is created at the end of the tutorial.

更新函数运行时版本Update the function runtime version

  1. 在 Web 浏览器中打开另一个选项卡,然后导航到 Azure 门户Open another tab in the web browser, and navigate to Azure portal.

  2. 在 Azure 门户中的左侧菜单上选择“资源组”。In the Azure portal, select Resource groups on the left menu.

  3. 选择函数应用所在的资源组。Select the resource group in which the function app exists.

  4. 在资源组的资源列表中,选择“函数应用”。Select the function app in the list of resources in the resource group.

  5. 在左侧菜单中的“设置”下选择“配置” 。Select Configuration under Settings on the left menu.

  6. 切换到右窗格中的“函数运行时设置”选项卡。Switch to the Function runtime settings tab in the right pane.

  7. 将运行时版本更新为 ~3 。Update the runtime version to ~3.

    更新函数运行时版本

  8. 在工具栏上选择“保存”。Select Save on the toolbar.

  9. 在“保存更改”确认弹出窗口中,选择“继续” 。On the Save changes confirmation popup, select Continue.

发布 Azure Functions 应用Publish the Azure Functions app

  1. 启动 Visual Studio。Launch Visual Studio.

  2. 打开作为先决条件的一部分从 GitHub 下载的 EventHubsCaptureEventGridDemo.sln 解决方案。Open the EventHubsCaptureEventGridDemo.sln solution that you downloaded from the GitHub as part of the prerequisites. 可以在 /samples/e2e/EventHubsCaptureEventGridDemo 文件夹中找到它。You can find it in the /samples/e2e/EventHubsCaptureEventGridDemo folder.

  3. 在解决方案资源管理器中,右键单击“FunctionEGDWDumper”项目,然后选择“发布” 。In Solution Explorer, right-click FunctionEGDWDumper project, and select Publish.

  4. 如果看到以下屏幕,请选择“启动”。If you see the following screen, select Start.

    “发布”部分的“启动”按钮。

  5. 在“发布”对话框中,对于“目标”,请选择“Azure”,然后选择“下一步” 。In the Publish dialog box, select Azure for Target, and select Next.

  6. 选择“Azure Function App (Windows)”,然后选择“下一步” 。Select Azure Function App (Windows), and select Next.

  7. 在“Functions 实例”选项卡上,选择你的 Azure 订阅,展开资源组,并选择函数应用,然后选择“完成” 。On the Functions instance tab, select your Azure subscription, expand the resource group, and select you function app, and then select Finish. 如果尚未执行该操作,则需要登录到 Azure 帐户。You need to sign into your Azure account if you haven't already done so.

    选择函数应用

  8. 在“发布”页上的“服务依赖项”部分中,为“存储”选择“配置” 。On the Publish page, in the Service Dependencies section, select Configure for Storage.

    为存储服务依赖项选择配置链接

  9. 在“配置依赖项”页上,执行以下步骤:On the Configure dependency page, follow these steps:

    1. 选择之前创建的存储帐户,然后选择“下一步” 。select the storage account you created earlier, and then select Next.

      选择存储帐户

    2. 为连接字符串指定名称,并为“保存连接字符串”选项选择“无”,然后选择“下一步” 。Specify a name for the connection string, and select None for the Save connection string option, and then select Next.

      指定连接字符串名称

    3. 清除“C# 代码文件”和“机密存储”选项,然后选择“完成” 。Clear the C# code file and Secrets store option, and then select Finish.

      查看更改摘要

  10. 在 Visual Studio 配置好配置文件后,选择“发布”。When Visual Studio has configured the profile, select Publish.

    选择发布

  11. 在打开了“Azure Function”页面的选项卡中,选择左侧菜单中的“函数” 。In the tab that has the Azure Function page open, select Functions on the left menu. 确认 EventGridTriggerMigrateData 函数显示在列表中。Confirm that the EventGridTriggerMigrateData function shows up in the list. 如果看不到该函数,请尝试再次从 Visual Studio 发布,然后在门户中刷新页面。If you don't see it, try publishing from Visual Studio again, and then refresh the page in the portal.

    确认函数创建

在发布函数后,已准备好订阅事件。After publishing the function, you're ready to subscribe to the event.

订阅事件Subscribe to the event

  1. 在 Web 浏览器的新选项卡或新窗口中,导航到 Azure 门户In a new tab or new window of a web browser, navigate to the Azure portal.

  2. 在 Azure 门户中的左侧菜单上选择“资源组”。In the Azure portal, select Resource groups on the left menu.

  3. 通过在搜索框中输入资源组的名称来筛选资源组列表。Filter the list of resource groups by entering the name of your resource group in the search box.

  4. 在列表中选择你的资源组。Select your resource group in the list.

  5. 从资源列表中选择“事件中心命名空间”。Select the Event Hubs namespace from the list of resources.

  6. 在“事件中心命名空间”页的左侧菜单中选择“事件”,然后在工具栏上选择“+ 事件订阅” 。On the Event Hubs Namespace page, select Events on the left menu, and then select + Event Subscription on the toolbar.

    在“事件”页面上为事件中心命名空间添加事件订阅链接

  7. 在“创建事件订阅”页上执行以下步骤: On the Create Event Subscription page, follow these steps:

    1. 输入事件订阅的名称。Enter a name for the event subscription.

    2. 输入系统主题的名称。Enter a name for the system topic. 系统主题为发送方提供发送事件的终结点。A system topic provides an endpoint for the sender to send events. 有关详细信息,请参阅系统主题For more information, see System topics

    3. 对于“终结点类型”,请选择“Azure Function” 。For Endpoint Type, select Azure Function.

    4. 对于“终结点”,请选择该链接。For Endpoint, select the link.

    5. 在“选择 Azure Function”页上,如果以下步骤没有自动填充,请执行这些步骤。On the Select Azure Function page, follow these steps if they aren't automatically filled.

      1. 选择包含 Azure Function 的 Azure 订阅。Select the Azure subscription that has the Azure function.
      2. 为函数选择资源组。Select the resource group for the function.
      3. 选择函数应用。Select the function app.
      4. 选择部署槽位。Select the deployment slot.
      5. 选择 EventGridTriggerMigrateData 函数。Select the function EventGridTriggerMigrateData.
    6. 在“选择 Azure Function”页上,选择”确认选择” 。On the Select Azure Function page, select Confirm Selection.

    7. 然后返回“创建事件订阅”页,选择“创建” 。Then, back on the Create Event Subscription page, select Create.

      使用函数创建事件订阅

  8. 验证是否已创建事件订阅。Verify that the event subscription is created. 切换到事件中心命名空间的“事件”页上的“事件订阅”选项卡 。Switch to the Event Subscriptions tab on the Events page for the Event Hubs namespace.

    确认事件订阅

  9. 在资源组的资源列表中,选择应用服务计划(而不是应用服务)。Select the App Service plan (not the App Service) in the list of resources in the resource group.

运行应用以生成数据Run the app to generate data

至此,已完成设置事件中心、专用 SQL 池(以前称为 SQL 数据仓库)、Azure 函数应用和事件订阅。You've finished setting up your event hub, dedicate SQL pool (formerly SQL Data Warehouse), Azure function app, and event subscription. 需要先配置几个值,然后再运行应用来生成事件中心数据。Before running an application that generates data for event hub, you need to configure a few values.

  1. 在 Azure 门户中,像之前那样导航到资源组。In the Azure portal, navigate to your resource group as you did earlier.

  2. 选择事件中心命名空间。Select the Event Hubs namespace.

  3. 在“事件中心命名空间”页中的左侧菜单上选择“共享访问策略” 。In the Event Hubs Namespace page, select Shared access policies on the left menu.

  4. 在策略列表中选择 RootManageSharedAccessKey。Select RootManageSharedAccessKey in the list of policies.

    事件中心命名空间的“共享访问策略”页

  5. 选择“连接字符串 - 主密钥”文本框旁边的“复制”按钮。Select the copy button next to the Connection string-primary key text box.

  6. 返回到 Visual Studio 解决方案。Go back to your Visual Studio solution.

  7. 右键单击“WindTurbineDataGenerator”项目,然后选择“设为启动项目” 。Right-click WindTurbineDataGenerator project, and select Set as Startup project.

  8. 在 WindTurbineDataGenerator 项目中,打开 program.cs。In the WindTurbineDataGenerator project, open program.cs.

  9. <EVENT HUBS NAMESPACE CONNECTION STRING> 替换为从门户复制的连接字符串。Replace <EVENT HUBS NAMESPACE CONNECTION STRING> with the connection string you copied from the portal.

  10. <EVENT HUB NAME> 替换为事件中心的名称。Replace <EVENT HUB NAME> with the name of the event hub.

    private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.chinacloudapi.cn/...";
    private const string EventHubName = "hubdatamigration";
    
  11. 生成解决方案。Build the solution. 运行 WindTurbineGenerator.exe 应用程序。Run the WindTurbineGenerator.exe application.

  12. 几分钟后,在打开了查询窗口的另一个浏览器选项卡中,查询数据仓库中的表以获取已迁移的数据。After a couple of minutes, in the other browser tab where you have the query window open, query the table in your data warehouse for the migrated data.

    select * from [dbo].[Fact_WindTurbineMetrics]    
    

    查询结果

监视解决方案Monitor the solution

本部分可帮助你监视解决方案或对解决方案进行故障排除。This section helps you with monitoring or troubleshooting the solution.

查看存储帐户中的捕获数据View captured data in the storage account

  1. 导航到资源组,然后选择用于捕获事件数据的存储帐户。Navigate to the resource group and select the storage account used for capturing event data.

  2. 在“存储帐户”页的左侧菜单中选择“存储资源管理器(预览)” 。On the Storage account page, select Storage Explorer (preview) on the left menu.

  3. 展开“BLOB 容器”,并选择“windturbinecapture” 。Expand BLOB CONTAINERS, and select windturbinecapture.

  4. 在右侧窗格中打开与事件中心命名空间名称相同的文件夹。Open the folder named same as your Event Hubs namespace in the right pane.

  5. 打开与事件中心名称相同的文件夹 (hubdatamigration)。Open the folder named same as your event hub (hubdatamigration).

  6. 钻取文件夹,你将看到 AVRO 文件。Drill through the folders and you see the AVRO files. 下面是一个示例:Here's an example:

    存储中的捕获文件

验证事件网格触发器是否调用了函数Verify that the Event Grid trigger invoked the function

  1. 导航到资源组,然后选择函数应用。Navigate to the resource group and select the function app.

  2. 在左侧菜单中选择“函数”。Select Functions on the left menu.

  3. 从列表中选择 EventGridTriggerMigrateData 函数。Select the EventGridTriggerMigrateData function from the list.

  4. 在“函数”页的左侧菜单中选择“监视器” 。On the Function page, select Monitor on the left menu.

  5. 选择“配置”,配置 Application Insights 来捕获调用日志。Select Configure to configure application insights to capture invocation logs.

  6. 新建一个 Application Insights 资源或使用现有资源。Create a new Application Insights resource or use an existing resource.

  7. 导航回到该函数的“监视器”页。Navigate back to the Monitor page for the function.

  8. 确认发送事件的客户端应用程序 (WindTurbineDataGenerator) 仍在运行。Confirm that the client application (WindTurbineDataGenerator) that's sending the events is still running. 如果没有,请运行应用。If not, run the app.

  9. 等待几分钟(5 分钟或更长时间),然后选择“刷新”按钮以查看函数调用。Wait for a few minutes (5 minutes or more) and select the Refresh button to see function invocations.

    函数调用

  10. 选择一个调用以查看详细信息。Select an invocation to see details.

    事件网格将事件数据分发给订阅者。Event Grid distributes event data to the subscribers. 以下示例显示了在 Blob 中捕获通过事件中心的数据流时生成的事件数据。The following example shows event data generated when data streaming through an event hub is captured in a blob. 特别要注意 data 对象中的 fileUrl 属性指向存储中的 Blob。In particular, notice the fileUrl property in the data object points to the blob in the storage. 函数应用使用此 URL 来检索具有捕获数据的 Blob 文件。The function app uses this URL to retrieve the blob file with captured data.

    {
        "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207",
        "subject": "hubdatamigration",
        "eventType": "Microsoft.EventHub.CaptureFileCreated",
        "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba",
        "data": {
            "fileUrl": "https://spehubstorage1207.blob.core.windows.net/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro",
            "fileType": "AzureBlockBlob",
            "partitionId": "0",
            "sizeInBytes": 473444,
            "eventCount": 2800,
            "firstSequenceNumber": 55500,
            "lastSequenceNumber": 58299,
            "firstEnqueueTime": "2020-12-07T21:49:12.556Z",
            "lastEnqueueTime": "2020-12-07T21:50:11.534Z"
        },
        "dataVersion": "1",
        "metadataVersion": "1",
        "eventTime": "2020-12-07T21:50:12.7065524Z"
    }
    

验证数据是否存储在专用的 SQL 池中Verify that the data is stored in the dedicated SQL pool

在打开了查询窗口的浏览器选项卡中,查询专用 SQL 池中的表以获取已迁移的数据。In the browser tab where you have the query window open, query the table in your dedicated SQL pool for the migrated data.

查询结果

后续步骤Next steps

可以将强大的数据可视化工具与数据仓库配合使用,以便获取可行的见解。You can use powerful data visualization tools with your data warehouse to achieve actionable insights.

本文介绍如何将 Power BI 与 Azure Synapse Analytics 配合使用This article shows how to use Power BI with Azure Synapse Analytics