将数据从事件中心引入到 Azure 数据资源管理器Ingest data from Event Hub into Azure Data Explorer

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器可从事件中心引入(加载数据),是一个大数据流式处理平台和事件引入服务。Azure Data Explorer offers ingestion (data loading) from Event Hubs, a big data streaming platform and event ingestion service. 事件中心每秒可以近实时处理数百万个事件。Event Hubs can process millions of events per second in near real-time. 在本文中,将创建事件中心,从 Azure 数据资源管理器中连接到该事件中心,并查看通过系统的数据流。In this article, you create an event hub, connect to it from Azure Data Explorer and see data flow through the system.

先决条件Prerequisites

登录到 Azure 门户Sign in to the Azure portal

登录到 Azure 门户Sign in to the Azure portal.

创建事件中心Create an event hub

在本文中,将生成示例数据并将其发送到事件中心。In this article, you generate sample data and send it to an event hub. 第一步是创建事件中心。The first step is to create an event hub. 通过使用 Azure 资源管理器模板在 Azure 门户中执行此操作。You do this by using an Azure Resource Manager template in the Azure portal.

  1. 若要创建事件中心,请使用以下按钮开始部署。To create an event hub, use the following button to start the deployment. 右键单击并选择“在新窗口中打开” ,以便按本文中的剩余步骤操作。Right-click and select Open in new window, so you can follow the rest of the steps in this article.

    “部署到 Azure”Deploy to Azure

    “部署到 Azure” 按钮将转到 Azure 门户以填写部署窗体。The Deploy to Azure button takes you to the Azure portal to fill out a deployment form.

    “部署到 Azure”

  2. 选择要在其中创建事件中心的订阅,并创建名为 test-hub-rg 的资源组。Select the subscription where you want to create the event hub, and create a resource group named test-hub-rg.

    创建资源组

  3. 使用以下信息填写窗体。Fill out the form with the following information.

    部署窗体

    对下表中未列出的任何设置使用默认值。Use defaults for any settings not listed in the following table.

    设置Setting 建议的值Suggested value 字段说明Field description
    订阅Subscription 你的订阅Your subscription 选择要用于事件中心的 Azure 订阅。Select the Azure subscription that you want to use for your event hub.
    资源组Resource group test-hub-rg test-hub-rg 创建新的资源组。Create a new resource group.
    位置Location 中国东部 2China East2 对于本文,选择“中国东部 2” 。Select China East2 for this article. 对于生产系统,请选择最能满足你需求的区域。For a production system, select the region that best meets your needs. 在与 Kusto 群集相同的位置创建事件中心命名空间以获得最佳性能(对于具有高吞吐量的事件中心命名空间来说最重要)。Create the event hub namespace in the same Location as the Kusto cluster for best performance (most important for event hub namespaces with high throughput).
    命名空间名称Namespace name 唯一的命名空间名称A unique namespace name 选择用于标识命名空间的唯一名称。Choose a unique name that identifies your namespace. 例如,mytestnamespace 。For example, mytestnamespace. 域名 servicebus.chinacloudapi.cn 将追加到所提供的名称。The domain name servicebus.chinacloudapi.cn is appended to the name you provide. 字段只能包含字母、数字和连字符。The name can contain only letters, numbers, and hyphens. 名称必须以字母开头,并且必须以字母或数字结尾。The name must start with a letter, and it must end with a letter or number. 值长度必须介于 6 到 50 个字符之间。The value must be between 6 and 50 characters long.
    事件中心名称Event hub name test-hub test-hub 事件中心位于命名空间下,该命名空间提供唯一的范围容器。The event hub sits under the namespace, which provides a unique scoping container. 事件中心名称在命名空间中必须唯一。The event hub name must be unique within the namespace.
    使用者组名称Consumer group name test-group test-group 使用者组允许多个使用应用程序各自具有事件流的单独视图。Consumer groups enable multiple consuming applications to each have a separate view of the event stream.
  4. 选择“购买” ,确认你要在订阅中创建资源。Select Purchase, which acknowledges that you're creating resources in your subscription.

  5. 在工具栏上选择“通知”以监视预配过程。 Select Notifications on the toolbar to monitor the provisioning process. 部署成功可能需要几分钟时间,但现在可以继续执行下一步。It might take several minutes for the deployment to succeed, but you can move on to the next step now.

    通知

在 Azure 数据资源管理器中创建目标表Create a target table in Azure Data Explorer

现在,在 Azure 数据资源管理器中创建一个表,事件中心会向该表发送数据。Now you create a table in Azure Data Explorer, to which Event Hubs will send data. 在“先决条件” 中预配的群集和数据库中创建表。You create the table in the cluster and database provisioned in Prerequisites.

  1. 在 Azure 门户中导航到群集,然后选择“查询”。 In the Azure portal, navigate to your cluster then select Query.

    查询应用程序链接

  2. 将以下命令复制到窗口中,然后选择“运行” 以创建将接收引入数据的表 (TestTable)。Copy the following command into the window and select Run to create the table (TestTable) which will receive the ingested data.

    .create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
    

    运行创建查询

  3. 将以下命令复制到窗口中,然后选择“运行” 将传入的 JSON 数据映射到表 (TestTable) 的列名和数据类型。Copy the following command into the window and select Run to map the incoming JSON data to the column names and data types of the table (TestTable).

    .create table TestTable ingestion json mapping 'TestMapping' '[{"column":"TimeStamp", "Properties": {"Path": "$.timeStamp"}},{"column":"Name", "Properties": {"Path":"$.name"}} ,{"column":"Metric", "Properties": {"Path":"$.metric"}}, {"column":"Source", "Properties": {"Path":"$.source"}}]'
    

连接到事件中心Connect to the event hub

现在,请通过 Azure 数据资源管理器连接到事件中心。Now you connect to the event hub from Azure Data Explorer. 当此连接建立好以后,流入事件中心的数据会流式传输到此前在本文中创建的测试表。When this connection is in place, data that flows into the event hub streams to the test table you created earlier in this article.

  1. 在工具栏上选择“通知” ,以验证事件中心部署是否成功。Select Notifications on the toolbar to verify that the event hub deployment was successful.

  2. 在创建的群集下,选择“数据库” ,然后选择“TestDatabase” 。Under the cluster you created, select Databases then TestDatabase.

    选择测试数据库

  3. 选择“数据引入” ,然后选择“添加数据连接” 。Select Data ingestion and Add data connection. 然后使用以下信息填写窗体。Then fill out the form with the following information. 完成后,选择“创建” 。Select Create when you are finished.

    事件中心连接

    数据源:Data Source:

    设置Setting 建议的值Suggested value 字段说明Field description
    数据连接名称Data connection name test-hub-connection test-hub-connection 要在 Azure 数据资源管理器中创建的连接的名称。The name of the connection you want to create in Azure Data Explorer.
    事件中心命名空间Event hub namespace 唯一的命名空间名称A unique namespace name 先前选择的用于标识命名空间的名称。The name you chose earlier that identifies your namespace.
    事件中心Event hub test-hub test-hub 你创建的事件中心。The event hub you created.
    使用者组Consumer group test-group test-group 在创建的事件中心定义的使用者组。The consumer group defined in the event hub you created.
    事件系统属性Event system properties 选择相关属性Select relevant properties 事件中心系统属性The Event Hub system properties. 如果每个事件消息有多个记录,则系统属性将添加到第一个记录中。If there are multiple records per event message, the system properties will be added to the first one. 添加系统属性时,创建更新表架构和映射以包括所选属性。When adding system properties, create or update table schema and mapping to include the selected properties.
    压缩Compression None 事件中心消息有效负载的压缩类型。The compression type of the Event Hub messages payload. 支持的压缩类型:None、GZip 。Supported compression types: None, GZip.

    目标表:Target table:

    路由引入数据有两个选项:静态和动态。 There are two options for routing the ingested data: static and dynamic. 本文将使用静态路由,需在其中指定表名、数据格式和映射。For this article, you use static routing, where you specify the table name, data format, and mapping. 因此,请让“我的数据包含路由信息”保留未选中状态。 Therefore, leave My data includes routing info unselected.

    设置Setting 建议的值Suggested value 字段说明Field description
    Table TestTable TestTable 在“TestDatabase” 中创建的表。The table you created in TestDatabase.
    数据格式Data format JSONJSON 支持的格式为 Avro、CSV、JSON、多行 JSON、PSV、SOHSV、SCSV、TSV、TSVE、TXT、ORC 和 PARQUET。Supported formats are Avro, CSV, JSON, MULTILINE JSON, PSV, SOHSV, SCSV, TSV, TSVE, TXT, ORC and PARQUET.
    列映射Column mapping TestMapping TestMapping TestDatabase 中创建的映射,它将传入的 JSON 数据映射到 TestTable 的列名称和数据类型。The mapping you created in TestDatabase, which maps incoming JSON data to the column names and data types of TestTable. 对于 JSON 或多行 JSON 是必需的,对于其他格式是可选的。Required for JSON or MULTILINE JSON, and optional for other formats.

    Note

    • 选择“我的数据包含路由信息” 以使用动态路由,其中你的数据包含必要的路由信息,如示例应用注释中所示。Select My data includes routing info to use dynamic routing, where your data includes the necessary routing information as seen in the sample app comments. 如果同时设置了静态和动态属性,则动态属性将覆盖静态属性。If both static and dynamic properties are set, the dynamic properties override the static ones.
    • 只有创建数据连接后进入队列的事件才会被引入。Only events enqueued after you create the data connection are ingested.
    • 还可以通过动态属性设置压缩类型,如示例应用中所示。You can also set the compression type via dynamic properties as seen in the sample app.
    • GZip 压缩有效负载不支持 Avro、ORC 和 PARQUET 格式以及事件系统属性。Avro, ORC and PARQUET formats as well as event system properties aren't supported on GZip compression payload.

事件系统属性映射Event system properties mapping

Note

  • 单记录事件支持系统属性。System properties are supported for single-record events.
  • 对于 csv 映射,属性将添加到记录的开头。For csv mapping, properties are added at the beginning of the record. 对于 json 映射,将根据下拉列表中显示的名称添加属性。For json mapping, properties are added according to the name that appears in the drop-down list.

如果在表的“数据源” 部分中选择了“事件系统属性” ,则必须在表架构和映射中包含以下属性。If you selected Event system properties in the Data Source section of the table, you must include the following properties in the table schema and mapping.

表架构示例Table schema example

如果数据包含三列(TimespanMetricValue)并且包含的属性是 x-opt-enqueued-timex-opt-offset,请使用以下命令创建或更改表架构:If your data includes three columns (Timespan, Metric, and Value) and the properties you include are x-opt-enqueued-time and x-opt-offset, create or alter the table schema by using this command:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV 映射示例CSV mapping example

运行以下命令,将数据添加到记录的开头。Run the following commands to add data to the beginning of the record. 记下序号值。Note ordinal values.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON 映射示例JSON mapping example

使用出现在“数据连接” 边栏选项卡“事件系统属性” 列表中的系统属性名称添加数据。Data is added by using the system properties names as they appear in the Data connection blade Event system properties list. 运行以下命令:Run these commands:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.metric_value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

复制连接字符串Copy the connection string

运行在先决条件中列出的示例应用时,需要事件中心命名空间的连接字符串。When you run the sample app listed in Prerequisites, you need the connection string for the event hub namespace.

  1. 在创建的事件中心命名空间下,选择“共享访问策略” ,然后选择“RootManageSharedAccessKey” 。Under the event hub namespace you created, select Shared access policies, then RootManageSharedAccessKey.

    共享访问策略

  2. 复制“连接字符串 - 主键” 。Copy Connection string - primary key. 请将其粘贴到下一节。You paste it in the next section.

    连接字符串

生成示例数据Generate sample data

使用下载的示例应用生成数据。Use the sample app you downloaded to generate data.

  1. 在 Visual Studio 中打开示例应用解决方案。Open the sample app solution in Visual Studio.

  2. 在 program.cs 文件中,将 connectionString 常量更新为从事件中心命名空间复制的连接字符串。In the program.cs file, update the connectionString constant to the connection string you copied from the event hub namespace.

    const string eventHubName = "test-hub";
    // Copy the connection string ("Connection string-primary key") from your Event Hub namespace.
    const string connectionString = @"<YourConnectionString>";
    
  3. 构建并运行应用程序。Build and run the app. 应用将消息发送到事件中心,并且每十秒显示一次状态。The app sends messages to the event hub, and it prints out status every ten seconds.

  4. 应用发送一些消息后,继续执行下一步:查看到事件中心和测试表的数据流。After the app has sent a few messages, move on to the next step: reviewing the flow of data into your event hub and test table.

查看数据流Review the data flow

应用生成数据以后,现在可以看到该数据从事件中心流到群集中的表。With the app generating data, you can now see the flow of that data from the event hub to the table in your cluster.

  1. 在 Azure 门户中的事件中心下,可以看到应用运行时活动的峰值。In the Azure portal, under your event hub, you see the spike in activity while the app is running.

    事件中心图

  2. 若要检查到目前为止已向数据库发送的消息数,请在测试数据库中运行以下查询。To check how many messages have made it to the database so far, run the following query in your test database.

    TestTable
    | count
    
  3. 若要查看消息的内容,请运行以下查询:To see the content of the messages, run the following query:

    TestTable
    

    结果集应如下所示:The result set should look like the following:

    消息结果集

    Note

    • Azure 数据资源管理器具有用于数据引入的聚合(批处理)策略,旨在优化引入过程。Azure Data Explorer has an aggregation (batching) policy for data ingestion, designed to optimize the ingestion process. 默认情况下,该策略配置为 5 分钟或 500 MB 数据,因此你可能会遇到延迟。The policy is configured to 5 minutes or 500 MB of data, by default, so you may experience a latency. 有关聚合选项,请参阅批处理策略See batching policy for aggregation options.
    • 事件中心引入包括 10 秒或 1 MB 的事件中心响应时间。Event Hub ingestion includes Event Hub response time of 10 seconds or 1 MB.
    • 配置表以支持流式处理并消除响应时间延迟。Configure your table to support streaming and remove the lag in response time. 请参阅流式处理策略See streaming policy.

清理资源Clean up resources

如果不打算再次使用事件中心,请清理 test-hub-rg ,以避免产生费用。If you don't plan to use your event hub again, clean up test-hub-rg, to avoid incurring costs.

  1. 在 Azure 门户的最左侧选择“资源组”,,然后选择创建的资源组。 In the Azure portal, select Resource groups on the far left, and then select the resource group you created.

    如果左侧菜单处于折叠状态,请选择If the left menu is collapsed, select “展开”按钮 将其展开。to expand it.

    选择要删除的资源组

  2. 在“test-resource-group” 下,选择“删除资源组” 。Under test-resource-group, select Delete resource group.

  3. 在新窗口中,键入要删除的资源组的名称 (test-hub-rg ),然后选择“删除” 。In the new window, type the name of the resource group to delete (test-hub-rg), and then select Delete.

后续步骤Next steps