Compartir a través de

教程:使用 Azure 事件网格和 Azure Functions 将捕获的事件中心数据从 Azure 存储迁移到 Azure Synapse Analytics

在本教程中,你将使用 Azure 事件网格和 Azure Functions 将捕获的事件中心数据从 Azure Blob 存储迁移到 Azure Synapse Analytics(特别是专用的 SQL 池)。

应用概览

此图描绘了在本教程中生成的解决方案的工作流:

  1. 在 Azure Blob 存储中捕获发送到 Azure 事件中心的数据。
  2. 完成数据捕获后,将生成一个事件并将其发送到 Azure 事件网格。
  3. Azure 事件网格将此事件数据转发到 Azure 函数应用。
  4. 函数应用使用事件数据中的 Blob URL 从存储中检索 Blob。
  5. 函数应用将 Blob 数据迁移到 Azure Synapse Analytics。

在本文中,将执行以下步骤:

  • 部署本教程所需的基础结构
  • 将代码发布到 Functions 应用
  • 创建事件网格订阅
  • 将示例数据流式传输到事件中心
  • 验证 Azure Synapse Analytics 中捕获的数据

先决条件

若要完成本教程,必须满足以下先决条件:

  • 本文假设你熟悉 Azure 事件网格和 Azure 事件中心(尤其是“捕获”功能)。 如果你并不熟悉 Azure 事件网格,请参阅 Azure 事件网格简介。 有关 Azure 事件中心捕获功能的详细信息,请参阅通过 Azure Blob 存储或 Azure Data Lake Storage 中的 Azure 事件中心来捕获事件
  • Azure 订阅。 如果没有 Azure 订阅,可在开始前创建一个试用帐户
  • Visual Studio,包含适用于以下用途的工作负载:.NET 桌面开发、Azure 开发、ASP.NET 和 Web 开发、Node.js 开发和 Python 开发。
  • EventHubsCaptureEventGridDemo 示例项目下载到计算机上。
    • WindTurbineDataGenerator - 一个简单的发布服务器,可将示例风力涡轮机数据发送到启用了捕获功能的事件中心。
    • FunctionDWDumper - 一个 Azure 函数,可以在将 Avro 文件捕获到 Azure 存储 Blob 时从 Azure 事件网格接收通知。 它接收 Blob 的 URI 路径、读取其内容并将该数据推送到 Azure Synapse Analytics(专用 SQL 池)。

部署基础结构

在此步骤中,使用资源管理器模板部署所需的基础结构。 部署模板时,将创建以下资源:

  • 已启用捕获功能的事件中心。
  • 适用于已捕获文件的存储帐户。
  • 用于托管函数应用的应用服务计划
  • 用于处理事件的函数应用
  • 用于托管数据仓库的 SQL Server
  • 用于存储迁移数据的 Azure Synapse Analytics(专用 SQL 池)

使用 Azure CLI 部署基础结构

  1. 登录 Azure 门户
  1. 通过运行以下 CLI 命令创建 Azure 资源组:
    1. 将以下命令复制并粘贴到 Azure CLI 窗口中。 如有需要,请更改资源组名称和位置。

      az group create -l chinanorth -n rgDataMigration
      
    2. Enter

      下面是一个示例:

      user@Azure:~$ az group create -l chinanorth -n rgDataMigration
      {
        "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration",
        "location": "chinanorth",
        "managedBy": null,
        "name": "rgDataMigration",
        "properties": {
          "provisioningState": "Succeeded"
        },
        "tags": null
      }
      
  2. 通过运行以下 CLI 命令来部署上一部分提到的所有资源(事件中心、存储帐户、函数应用、Azure Synapse Analytics):
    1. 将命令复制并粘贴到 Azure CLI 窗口中。 也可将内容复制/粘贴到所选的编辑器中,设置值,然后将该命令复制到 Azure CLI。 如果由于 Azure 资源名称而出现错误,请删除资源组,修复名称,然后再次重试该命令。

      重要

      运行此命令前,指定以下实体的值:

      • 之前创建的资源组的名称。
      • 事件中心命名空间的名称。
      • 事件中心的名称。 可以将值保留原样 (hubdatamigration)。
      • SQL Server 的名称。
      • SQL 用户名称和密码。
      • 数据库的名称。
      • 存储帐户的名称。
      • 函数应用的名称。
      az deployment group create \
          --resource-group rgDataMigration \
          --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \
          --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
      
    2. 此过程可能需要一段时间,因为你要创建一系列资源。 在命令的结果中,请确保没有任何故障。

验证是否已创建资源

  1. 在 Azure 门户中的左侧菜单上选择“资源组”。

  2. 通过在搜索框中输入资源组的名称来筛选资源组列表。

  3. 在列表中选择你的资源组。

    屏幕截图显示如何选择资源组。

  4. 确认是否在资源组中看到以下资源:

    屏幕截图显示资源组中的资源。

在 Azure Synapse Analytics 中创建表

在本部分,我们将在前面创建的专用 SQL 池中创建一个表。

  1. 在资源组的资源列表中,选择“专用 SQL 池”。

  2. 在“专用 SQL 池”页左侧菜单的“常见任务”部分中,选择“查询编辑器(预览版)” 。

    屏幕截图显示如何在 Azure 门户中的“专用 SQL 池”页上选择查询编辑器。

  3. 输入 SQL Server 的“用户名”和“密码”,然后选择“确定” 。 如果看到有关允许客户端访问 SQL 服务器的消息,请在服务器<SQL 服务器>上选择“将 IP <你的 IP 地址>加入允许列表”,然后选择“确定”。

  4. 在查询窗口中,复制并运行以下 SQL 脚本:

    CREATE TABLE [dbo].[Fact_WindTurbineMetrics] (
        [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
        [MeasureTime] datetime NULL,
        [GeneratedPower] float NULL,
        [WindSpeed] float NULL,
        [TurbineSpeed] float NULL
    )
    WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
    

    屏幕截图显示查询编辑器。

  5. 保持此选项卡或窗口处于打开状态,以便可以验证在本教程结束时是否创建了数据。

发布 Azure Functions 应用

首先,从 Azure 门户获取 Functions 应用的发布配置文件。 然后,使用发布配置文件从 Visual Studio 发布 Azure Functions 项目或应用。

获取发布配置文件

  1. 在“资源组”页上,选择资源列表中的“Azure Functions 应用”。

    屏幕截图显示如何在资源组的资源列表中选择函数应用。

  2. 在应用的“函数应用”页上,选择命令栏上的“获取发布配置文件”。

    屏幕截图显示如何在函数应用页的命令栏上选择“获取发布配置文件”按钮。****

  3. 下载文件并将其保存到 EventHubsCaptureEventGridDemo 文件夹的 FunctionEGDDumper 子文件夹中。

使用发布配置文件发布 Functions 应用

  1. 启动 Visual Studio。

  2. 打开作为先决条件的一部分从 GitHub 下载的 EventHubsCaptureEventGridDemo.sln 解决方案。 可以在 /samples/e2e/EventHubsCaptureEventGridDemo 文件夹中找到它。

  3. 在解决方案资源管理器中,右键单击“FunctionEGDWDumper”项目,然后选择“发布” 。

  4. 在以下屏幕中,选择“开始”或“添加发布配置文件”。

  5. 在“发布”对话框中,对于“目标”,请选择“导入配置文件”,然后选择“下一步”。

    屏幕截图显示如何在“发布”对话框中选择“导入配置文件”。********

  6. 在“导入配置文件”选项卡上,选择前面保存在 FunctionEGDWDumper 文件夹中的发布设置文件,然后选择“完成”。

  7. 在 Visual Studio 配置好配置文件后,选择“发布”。 确认发布成功。

  8. 在打开了“Azure 函数”页面的 Web 浏览器中,在中间窗格中选择“函数”。 确认 EventGridTriggerMigrateData 函数显示在列表中。 如果看不到该函数,请尝试再次从 Visual Studio 发布,然后在门户中刷新页面。

    屏幕截图显示如何确认函数创建操作。

在发布函数后,已准备好订阅事件。

订阅事件

  1. 在 Web 浏览器的新选项卡或新窗口中,登录 Azure 门户

  2. 在 Azure 门户中的左侧菜单上选择“资源组”。

  3. 通过在搜索框中输入资源组的名称来筛选资源组列表。

  4. 在列表中选择你的资源组。

  5. 从资源列表中选择“事件中心命名空间”。

  6. 在“事件中心命名空间”页的左侧菜单中选择“事件”,然后在工具栏上选择“+ 事件订阅” 。

    屏幕截图:Azure 事件中心命名空间的“事件”页面,其中选择了“添加事件订阅”链接。

  7. 在“创建事件订阅”页上执行以下步骤:

    1. 输入事件订阅的名称。

    2. 输入系统主题的名称。 系统主题为发送方提供发送事件的终结点。 有关详细信息,请参阅系统主题

    3. 对于“终结点类型”,请选择“Azure Function” 。

    4. 对于“终结点”,请选择该链接。

    5. 在“选择 Azure Function”页上,如果以下步骤没有自动填充,请执行这些步骤。

      1. 选择包含 Azure Function 的 Azure 订阅。
      2. 为函数选择资源组。
      3. 选择函数应用。
      4. 选择部署槽位。
      5. 选择 EventGridTriggerMigrateData 函数。
    6. 在“选择 Azure Function”页上,选择”确认选择” 。

    7. 然后返回“创建事件订阅”页,选择“创建” 。

      屏幕截图:创建事件订阅页面。

  8. 验证是否已创建事件订阅。 切换到事件中心命名空间的“事件”页上的“事件订阅”选项卡 。

    屏幕截图:“事件”页面上的“事件订阅”选项卡。

运行应用以生成数据

至此,已完成设置事件中心、专用 SQL 池(以前称为 SQL 数据仓库)、Azure 函数应用和事件订阅。 需要先配置几个值,然后再运行应用来生成事件中心数据。

  1. 在 Azure 门户中,像之前那样导航到资源组。

  2. 选择事件中心命名空间。

  3. 在“事件中心命名空间”页中的左侧菜单上选择“共享访问策略” 。

  4. 在策略列表中选择 RootManageSharedAccessKey。

    屏幕截图:Azure 事件中心命名空间的“共享访问策略”页。

  5. 选择“连接字符串 - 主密钥”文本框旁边的“复制”按钮。

  6. 返回到 Visual Studio 解决方案。

  7. 右键单击“WindTurbineDataGenerator”项目,然后选择“设为启动项目” 。

  8. 在 WindTurbineDataGenerator 项目中,打开 program.cs。

  9. <EVENT HUBS NAMESPACE CONNECTION STRING> 替换为从门户复制的连接字符串。

  10. 如果为事件中心使用了不同于 hubdatamigration 的名称,请将 <EVENT HUB NAME> 替换为事件中心的名称。

    private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.chinacloudapi.cn/...";
    private const string EventHubName = "hubdatamigration";
    
  11. 生成解决方案。 运行 WindTurbineGenerator.exe 应用程序。

  12. 几分钟后,在打开了查询窗口的另一个浏览器选项卡中,查询数据仓库中的表以获取已迁移的数据。

    select * from [dbo].[Fact_WindTurbineMetrics]
    

    屏幕截图:查询结果。

重要

我们使用连接字符串向 Azure 事件中心命名空间进行身份验证,使教程保持简单。 建议在生产环境中使用 Microsoft Entra ID 身份验证。 使用应用程序时,可以为应用程序启用托管标识,并在事件中心命名空间上为标识分配适当的角色(Azure 事件中心所有者、Azure 事件中心数据发送方或 Azure 事件中心数据接收方)。 有关详细信息,请参阅使用 Microsoft Entra ID 授予对事件中心的访问权限

监视解决方案

本部分可帮助你监视解决方案或对解决方案进行故障排除。

查看存储帐户中的捕获数据

  1. 导航到资源组,然后选择用于捕获事件数据的存储帐户。

  2. 在“存储帐户”页的左侧菜单中选择“存储浏览器”

  3. 展开“BLOB 容器”,并选择“windturbinecapture” 。

  4. 在右侧窗格中打开与事件中心命名空间名称相同的文件夹。

  5. 打开与事件中心名称相同的文件夹 (hubdatamigration)。

  6. 钻取文件夹,你将看到 AVRO 文件。 下面是一个示例:

    屏幕截图:存储中捕获的文件。

验证事件网格触发器是否调用了函数

  1. 导航到资源组,然后选择函数应用。

  2. 在中间窗格中选择“函数”选项卡。

  3. 从列表中选择 EventGridTriggerMigrateData 函数。

  4. 在“函数”页的左侧菜单中选择“监视器” 。

  5. 选择“配置”,配置 Application Insights 来捕获调用日志。

  6. 新建一个 Application Insights 资源或使用现有资源。

  7. 导航回到该函数的“监视器”页。

  8. 确认发送事件的客户端应用程序 (WindTurbineDataGenerator) 仍在运行。 如果没有,请运行应用。

  9. 等待几分钟(5 分钟或更长时间),然后选择“刷新”按钮以查看函数调用。

    屏幕截图:函数调用。

  10. 选择一个调用以查看详细信息。

    事件网格将事件数据分发给订阅者。 以下示例显示了在 Blob 中捕获通过事件中心的数据流时生成的事件数据。 特别要注意 data 对象中的 fileUrl 属性指向存储中的 Blob。 函数应用使用此 URL 来检索具有捕获数据的 Blob 文件。

    {
      "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207",
      "subject": "hubdatamigration",
      "eventType": "Microsoft.EventHub.CaptureFileCreated",
      "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba",
      "data": {
        "fileUrl": "https://spehubstorage1207.blob.core.chinacloudapi.cn/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro",
        "fileType": "AzureBlockBlob",
        "partitionId": "0",
        "sizeInBytes": 473444,
        "eventCount": 2800,
        "firstSequenceNumber": 55500,
        "lastSequenceNumber": 58299,
        "firstEnqueueTime": "2020-12-07T21:49:12.556Z",
        "lastEnqueueTime": "2020-12-07T21:50:11.534Z"
      },
      "dataVersion": "1",
      "metadataVersion": "1",
      "eventTime": "2020-12-07T21:50:12.7065524Z"
    }
    

验证数据是否存储在专用的 SQL 池中

在打开了查询窗口的浏览器选项卡中,查询专用 SQL 池中的表以获取已迁移的数据。

屏幕截图:最终查询结果。

后续步骤