Compartir a través de

教程:使用事件网格和 Azure Functions 将捕获的事件中心数据迁移到 Azure Synapse Analytics

Azure 事件中心捕获用于自动捕获 Azure Blob 存储或 Azure Data Lake Storage 中事件中心的流式处理数据。 本教程介绍如何使用事件网格触发的 Azure 函数将捕获的事件中心数据从存储迁移到 Azure Synapse Analytics。

Application overview

此图描绘了在本教程中生成的解决方案的工作流:

  1. 在 Azure Blob 存储中捕获发送到 Azure 事件中心的数据。
  2. 完成数据捕获后,将生成一个事件并将其发送到 Azure 事件网格。
  3. Azure 事件网格将此事件数据转发到 Azure 函数应用。
  4. 函数应用使用事件数据中的 Blob URL 从存储中检索 Blob。
  5. 函数应用将 Blob 数据迁移到 Azure Synapse Analytics。

在本文中,将执行以下步骤:

  • 部署本教程所需的基础结构
  • 将代码发布到 Functions 应用
  • 创建事件网格订阅
  • 将示例数据流式传输到事件中心
  • 验证 Azure Synapse Analytics 中捕获的数据

先决条件

若要完成本教程,必须满足以下先决条件:

  • Azure 订阅。 如果没有 Azure 订阅,可在开始前创建一个试用帐户
  • Visual Studio,包含适用于以下用途的工作负载:.NET 桌面开发、Azure 开发、ASP.NET 和 Web 开发、Node.js 开发和 Python 开发。
  • EventHubsCaptureEventGridDemo 示例项目下载到计算机上。
    • WindTurbineDataGenerator - 一个简单的发布服务器,可以将示例性的风力涡轮机数据发送到启用了捕获功能的事件中心
    • FunctionDWDumper - 一个 Azure 函数,可以在 Avro 文件被捕获到 Azure 存储 Blob 时从 Azure 事件网格接收通知。 它接收 Blob 的 URI 路径、读取其内容并将该数据推送到 Azure Synapse Analytics(专用 SQL 池)。

部署基础结构

在此步骤中,使用资源管理器模板部署所需的基础结构。 部署模板时,将创建以下资源:

  • 已启用捕获功能的事件中心。
  • 适用于已捕获文件的存储帐户。
  • 用于托管函数应用的应用服务计划
  • 用于处理事件的函数应用
  • 用于托管数据仓库的 SQL Server
  • 用于存储迁移数据的 Azure Synapse Analytics(专用 SQL 池)

使用 Azure CLI 部署基础结构

  1. 通过运行以下 CLI 命令创建 Azure 资源组:
    1. 将以下命令复制并粘贴到 CLI 窗口。 如有需要,请更改资源组名称和位置。

      az group create -l chinaeast2 -n rgDataMigration
      
    2. Enter

      下面是一个示例:

      user@Azure:~$ az group create -l chinaeast2 -n rgDataMigration
      {
        "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration",
        "location": "chinaeast2",
        "managedBy": null,
        "name": "rgDataMigration",
        "properties": {
          "provisioningState": "Succeeded"
        },
        "tags": null
      }
      
  2. 通过运行以下 CLI 命令来部署上一部分提到的所有资源(事件中心、存储帐户、函数应用、Azure Synapse Analytics):
    1. 将命令复制并粘贴到 CLI 窗口中。 也可能需要将内容复制/粘贴到所选的编辑器中,设置值,然后将该命令复制到 CLI。

      重要

      运行此命令前,指定以下实体的值:

      • 之前创建的资源组的名称。
      • 事件中心命名空间的名称。
      • 事件中心的名称。 可以将值保留原样 (hubdatamigration)。
      • SQL Server 的名称。
      • SQL 用户名称和密码。
      • 数据库的名称。
      • 存储帐户的名称。
      • 函数应用的名称。
      az deployment group create \
          --resource-group rgDataMigration \
          --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \
          --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
      
    2. 在 CLI 窗口中按 ENTER 以运行该命令。 此过程可能需要一段时间,因为正在创建一系列资源。 在命令的结果中,请确保没有任何故障。

验证是否已创建资源

  1. 在 Azure 门户中的左侧菜单上选择“资源组”。

  2. 通过在搜索框中输入资源组的名称来筛选资源组列表。

  3. 在列表中选择你的资源组。

    Screenshot showing the selection of your resource group.

  4. 确认是否在资源组中看到以下资源:

    Screenshot showing resources in the resource group.

在 Azure Synapse Analytics 中创建表

在本部分,我们将在前面创建的专用 SQL 池中创建一个表。

  1. 在资源组的资源列表中,选择“专用 SQL 池”。

  2. 在“专用 SQL 池”页左侧菜单的“常见任务”部分中,选择“查询编辑器(预览版)” 。

    Screenshot showing the selection of Query Editor on a Dedicated SQL pool page in the Azure portal.

  3. 输入 SQL Server 的“用户名”和“密码”,然后选择“确定” 。 如果看到有关允许客户端访问 SQL 服务器的消息,请在服务器<SQL 服务器>上选择“将 IP <你的 IP 地址>加入允许列表”,然后选择“确定”。

  4. 在查询窗口中,复制并运行以下 SQL 脚本:

    CREATE TABLE [dbo].[Fact_WindTurbineMetrics] (
        [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, 
        [MeasureTime] datetime NULL, 
        [GeneratedPower] float NULL, 
        [WindSpeed] float NULL, 
        [TurbineSpeed] float NULL
    )
    WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
    

    Screenshot showing the query editor.

  5. 保持此选项卡或窗口处于打开状态,以便可以验证在本教程结束时是否创建了数据。

发布 Azure Functions 应用

首先,从 Azure 门户获取 Functions 应用的发布配置文件。 然后,使用发布配置文件从 Visual Studio 发布 Azure Functions 项目或应用。

获取发布配置文件

  1. 在“资源组”页上,选择资源列表中的“Azure Functions 应用”。

    Screenshot showing the selection of the function app in the list of resources for a resource group.

  2. 在应用的“函数应用”页上,选择命令栏上的“获取发布配置文件”。

    Screenshot showing the selection of the **Get Publish Profile** button on the command bar of the function app page.

  3. 下载文件并将其保存到 EventHubsCaptureEventGridDemo 文件夹的 FunctionEGDDumper 子文件夹中。

使用发布配置文件发布 Functions 应用

  1. 启动 Visual Studio。

  2. 打开作为先决条件的一部分从 GitHub 下载的 EventHubsCaptureEventGridDemo.sln 解决方案。 可以在 /samples/e2e/EventHubsCaptureEventGridDemo 文件夹中找到它。

  3. 在解决方案资源管理器中,右键单击“FunctionEGDWDumper”项目,然后选择“发布” 。

  4. 在以下屏幕中,选择“开始”或“添加发布配置文件”。

  5. 在“发布”对话框中,对于“目标”,请选择“导入配置文件”,然后选择“下一步”。

    Screenshot showing the selection **Import Profile** on the **Publish** dialog box.

  6. 在“导入配置文件”选项卡上,选择前面保存在 FunctionEGDWDumper 文件夹中的发布设置文件,然后选择“完成”。

  7. 在 Visual Studio 配置好配置文件后,选择“发布”。 确认发布成功。

  8. 在打开了“Azure 函数”页面的 Web 浏览器中,选择左侧菜单中的“函数”。 确认 EventGridTriggerMigrateData 函数显示在列表中。 如果看不到该函数,请尝试再次从 Visual Studio 发布,然后在门户中刷新页面。

    Screenshot showing the confirmation of function creation.

在发布函数后,已准备好订阅事件。

订阅事件

  1. 在 Web 浏览器的新选项卡或新窗口中,导航到 Azure 门户

  2. 在 Azure 门户中的左侧菜单上选择“资源组”。

  3. 通过在搜索框中输入资源组的名称来筛选资源组列表。

  4. 在列表中选择你的资源组。

  5. 从资源列表中选择“事件中心命名空间”。

  6. 在“事件中心命名空间”页的左侧菜单中选择“事件”,然后在工具栏上选择“+ 事件订阅” 。

    Add event subscription link on the Events page for an Event Hubs namespace

  7. 在“创建事件订阅”页上执行以下步骤:

    1. 输入事件订阅的名称。

    2. 输入系统主题的名称。 系统主题为发送方提供发送事件的终结点。 有关详细信息,请参阅系统主题

    3. 对于“终结点类型”,请选择“Azure Function” 。

    4. 对于“终结点”,请选择该链接。

    5. 在“选择 Azure Function”页上,如果以下步骤没有自动填充,请执行这些步骤。

      1. 选择包含 Azure Function 的 Azure 订阅。
      2. 为函数选择资源组。
      3. 选择函数应用。
      4. 选择部署槽位。
      5. 选择 EventGridTriggerMigrateData 函数。
    6. 在“选择 Azure Function”页上,选择”确认选择” 。

    7. 然后返回“创建事件订阅”页,选择“创建” 。

      Create an event subscription using the function

  8. 验证是否已创建事件订阅。 切换到事件中心命名空间的“事件”页上的“事件订阅”选项卡 。

    Confirm event subscription

  9. 在资源组的资源列表中,选择应用服务计划(而不是应用服务)。

运行应用以生成数据

至此,已完成设置事件中心、专用 SQL 池(以前称为 SQL 数据仓库)、Azure 函数应用和事件订阅。 需要先配置几个值,然后再运行应用来生成事件中心数据。

  1. 在 Azure 门户中,像之前那样导航到资源组。

  2. 选择事件中心命名空间。

  3. 在“事件中心命名空间”页中的左侧菜单上选择“共享访问策略” 。

  4. 在策略列表中选择 RootManageSharedAccessKey。

    Shared access policies page for an Event Hubs namespace

  5. 选择“连接字符串 - 主密钥”文本框旁边的“复制”按钮。

  6. 返回到 Visual Studio 解决方案。

  7. 右键单击“WindTurbineDataGenerator”项目,然后选择“设为启动项目” 。

  8. 在 WindTurbineDataGenerator 项目中,打开 program.cs。

  9. <EVENT HUBS NAMESPACE CONNECTION STRING> 替换为从门户复制的连接字符串。

  10. 如果为事件中心使用了不同于 hubdatamigration 的名称,请将 <EVENT HUB NAME> 替换为事件中心的名称。

    private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.chinacloudapi.cn/...";
    private const string EventHubName = "hubdatamigration";
    
  11. 生成解决方案。 运行 WindTurbineGenerator.exe 应用程序。

  12. 几分钟后,在打开了查询窗口的另一个浏览器选项卡中,查询数据仓库中的表以获取已迁移的数据。

    select * from [dbo].[Fact_WindTurbineMetrics]    
    

    Query results

监视解决方案

本部分可帮助你监视解决方案或对解决方案进行故障排除。

查看存储帐户中的捕获数据

  1. 导航到资源组,然后选择用于捕获事件数据的存储帐户。

  2. 在“存储帐户”页的左侧菜单中选择“存储资源管理器(预览)” 。

  3. 展开“BLOB 容器”,并选择“windturbinecapture” 。

  4. 在右侧窗格中打开与事件中心命名空间名称相同的文件夹。

  5. 打开与事件中心名称相同的文件夹 (hubdatamigration)。

  6. 钻取文件夹,你将看到 AVRO 文件。 下面是一个示例:

    Captured file in the storage

验证事件网格触发器是否调用了函数

  1. 导航到资源组,然后选择函数应用。

  2. 在左侧菜单中选择“函数”。

  3. 从列表中选择 EventGridTriggerMigrateData 函数。

  4. 在“函数”页的左侧菜单中选择“监视器” 。

  5. 选择“配置”,配置 Application Insights 来捕获调用日志。

  6. 新建一个 Application Insights 资源或使用现有资源。

  7. 导航回到该函数的“监视器”页。

  8. 确认发送事件的客户端应用程序 (WindTurbineDataGenerator) 仍在运行。 如果没有,请运行应用。

  9. 等待几分钟(5 分钟或更长时间),然后选择“刷新”按钮以查看函数调用。

    Function invocations

  10. 选择一个调用以查看详细信息。

    事件网格将事件数据分发给订阅者。 以下示例显示了在 Blob 中捕获通过事件中心的数据流时生成的事件数据。 特别要注意 data 对象中的 fileUrl 属性指向存储中的 Blob。 函数应用使用此 URL 来检索具有捕获数据的 Blob 文件。

    {
    	"topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207",
    	"subject": "hubdatamigration",
    	"eventType": "Microsoft.EventHub.CaptureFileCreated",
    	"id": "4538f1a5-02d8-4b40-9f20-36301ac976ba",
    	"data": {
    		"fileUrl": "https://spehubstorage1207.blob.core.chinacloudapi.cn/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro",
    		"fileType": "AzureBlockBlob",
    		"partitionId": "0",
    		"sizeInBytes": 473444,
    		"eventCount": 2800,
    		"firstSequenceNumber": 55500,
    		"lastSequenceNumber": 58299,
    		"firstEnqueueTime": "2020-12-07T21:49:12.556Z",
    		"lastEnqueueTime": "2020-12-07T21:50:11.534Z"
    	},
    	"dataVersion": "1",
    	"metadataVersion": "1",
    	"eventTime": "2020-12-07T21:50:12.7065524Z"
    }
    

验证数据是否存储在专用的 SQL 池中

在打开了查询窗口的浏览器选项卡中,查询专用 SQL 池中的表以获取已迁移的数据。

Query results

后续步骤

可以将强大的数据可视化工具与数据仓库配合使用,以便获取可行的见解。

本文介绍如何将 Power BI 与 Azure Synapse Analytics 配合使用