事件中心数据连接Event Hub data connection

Azure 事件中心是大数据流式处理平台和事件引入服务。Azure Event Hubs is a big data streaming platform and event ingestion service. Azure 数据资源管理器通过客户管理的事件中心提供持续引入。Azure Data Explorer offers continuous ingestion from customer-managed Event Hubs.

事件中心引入管道使用几个步骤将事件传输到 Azure 数据资源管理器。The Event Hub ingestion pipeline transfers events to Azure Data Explorer in several steps. 首先,在 Azure 门户中创建事件中心。You first create an Event Hub in the Azure portal. 然后,创建 Azure 数据资源管理器目标表,使用给定的引入属性特定格式的数据引入到该表中。You then create a target table in Azure Data Explorer into which the data in a particular format, will be ingested using the given ingestion properties. 事件中心连接需要知道事件路由The Event Hub connection needs to know events routing. 根据事件系统属性映射,使用选定的属性嵌入数据。Data is embedded with selected properties according to the event system properties mapping. 创建到事件中心的连接,以创建事件中心发送事件Create a connection to Event Hub to create an Event Hub and send events. 可以通过 Azure 门户使用 C#Python 以编程方式管理此过程,也可以使用 Azure 资源管理器模板来这样做。This process can be managed through the Azure portal, programmatically with C# or Python, or with the Azure Resource Manager template.

有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述For general information about data ingestion in Azure Data Explorer, see Azure Data Explorer data ingestion overview.

数据格式Data format

  • 将以 EventData 对象的形式从事件中心读取数据。Data is read from the Event Hub in form of EventData objects.

  • 请参阅支持的格式See supported formats.

    备注

    事件中心不支持 .raw 格式。Event Hub doesn't support the .raw format.

  • 可使用 GZip 压缩算法来压缩数据。Data can be compressed using the GZip compression algorithm. 指定引入属性中的 CompressionSpecify Compression in ingestion properties.

    • 压缩格式(Avro、Parquet、ORC)不支持数据压缩。Data compression isn't supported for compressed formats (Avro, Parquet, ORC).
    • 压缩数据不支持自定义编码和嵌入式系统属性Custom encoding and embedded system properties aren't supported on compressed data.

引入属性Ingestion properties

引入属性会指示引入过程、数据路由到的位置以及数据处理方式。Ingestion properties instruct the ingestion process, where to route the data, and how to process it. 可以使用 EventData.Properties 指定事件引入的引入属性You can specify ingestion properties of the events ingestion using the EventData.Properties. 可以设置以下属性:You can set the following properties:

属性Property 说明Description
Table 现有目标表的名称(区分大小写)。Name (case sensitive) of the existing target table. 替代“Data Connection”窗格上设置的“Table”。Overrides the Table set on the Data Connection pane.
格式Format 数据格式。Data format. 替代“Data Connection”窗格上设置的“Data format”。Overrides the Data format set on the Data Connection pane.
IngestionMappingReferenceIngestionMappingReference 要使用的现有引入映射的名称。Name of the existing ingestion mapping to be used. 替代“Data Connection”窗格上设置的“Column mapping”。Overrides the Column mapping set on the Data Connection pane.
压缩Compression 数据压缩。None(默认值)或 GZip 压缩。Data compression, None (default), or GZip compression.
编码Encoding 数据编码,默认值为 UTF8。Data encoding, the default is UTF8. 可以是 .NET 支持的任何编码Can be any of .NET supported encodings.
标记(预览版)Tags (Preview) 将要与引入的数据(格式设置为 JSON 数组字符串)关联的标记的列表。A list of tags to associate with the ingested data, formatted as a JSON array string. 使用标记时存在性能影响There are performance implications when using tags.

备注

只有创建数据连接后进入队列的事件才会被引入。Only events enqueued after you create the data connection are ingested.

事件路由Events routing

设置到 Azure 数据资源管理器群集的事件中心连接时,请指定目标表属性(表名、数据格式、压缩和映射)。When you set up an Event Hub connection to Azure Data Explorer cluster, you specify target table properties (table name, data format, compression, and mapping). 数据的默认路由也称为 static routingThe default routing for your data is also referred to as static routing. 还可以使用事件属性指定每个事件的目标表属性。You can also specify target table properties for each event, using event properties. 连接将按照 EventData.Properties 中指定的要求动态路由数据,替代此事件的静态属性。The connection will dynamically route the data as specified in the EventData.Properties, overriding the static properties for this event.

在以下示例中设置事件中心详细信息,并将天气指标数据发送到 WeatherMetrics 表。In the following example, set event hub details and send weather metric data to table WeatherMetrics. 数据采用 json 格式。Data is in json format. mapping1WeatherMetrics 表中预定义。mapping1 is pre-defined on the table WeatherMetrics.

var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;

// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }; 
var data = JsonConvert.SerializeObject(metric);

// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");

// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();

事件系统属性映射Event system properties mapping

系统属性在事件排队时存储由事件中心服务设置的属性。System properties store properties that are set by the Event Hubs service, at the time the event is enqueued. Azure 数据资源管理器事件中心连接会将所选属性嵌入置于表中的数据中。The Azure Data Explorer Event Hub connection will embed the selected properties into the data landing in your table.

备注

  • 单记录事件支持系统属性。System properties are supported for single-record events.
  • 压缩数据不支持系统属性。System properties aren't supported on compressed data.
  • 对于 csv 映射,属性将按下表中列出的顺序添加到记录的开头。For csv mapping, properties are added at the beginning of the record in the order listed in the table below. 对于 json 映射,将根据下表中的属性名称添加属性。For json mapping, properties are added according to property names in the following table.

系统属性System properties

事件中心公开以下系统属性:Event Hub exposes the following system properties:

属性Property 数据类型Data Type 说明Description
x-opt-enqueued-timex-opt-enqueued-time datetimedatetime 将事件排队时的 UTC 时间UTC time when the event was enqueued
x-opt-sequence-numberx-opt-sequence-number longlong 事件中心分区流中的事件的逻辑序列号The logical sequence number of the event within the partition stream of the Event Hub
x-opt-offsetx-opt-offset stringstring 事件与事件中心分区流之间的偏移量。The offset of the event from the Event Hub partition stream. 偏移量标识符在事件中心流的分区中独一无二The offset identifier is unique within a partition of the Event Hub stream
x-opt-publisherx-opt-publisher stringstring 发布服务器名称(如果消息已发送到发布服务器终结点)The publisher name, if the message was sent to a publisher endpoint
x-opt-partition-keyx-opt-partition-key stringstring 存储了事件的相应分区的分区键The partition key of the corresponding partition that stored the event

如果在表的“数据源”部分中选择了“事件系统属性”,则必须在表架构和映射中包含这些属性。If you selected Event system properties in the Data Source section of the table, you must include the properties in the table schema and mapping.

架构映射示例Schema mapping examples

表架构映射示例Table schema mapping example

如果数据包含三列(TimespanMetricValue)并且包含的属性是 x-opt-enqueued-timex-opt-offset,请使用以下命令创建或更改表架构:If your data includes three columns (Timespan, Metric, and Value) and the properties you include are x-opt-enqueued-time and x-opt-offset, create or alter the table schema by using this command:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV 映射示例CSV mapping example

运行以下命令,将数据添加到记录的开头。Run the following commands to add data to the beginning of the record. 记下序号值。Note ordinal values.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON 映射示例JSON mapping example

使用系统属性映射添加数据。Data is added by using the system properties mapping. 运行以下命令:Run these commands:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.metric_value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

事件中心连接Event Hub connection

备注

为了获得最佳性能,请在 Azure 数据资源管理器群集所在的区域中创建所有资源。For best performance, create all resources in the same region as the Azure Data Explorer cluster.

创建事件中心Create an Event Hub

创建事件中心(如果还没有事件中心)。If you don't already have one, Create an event hub. 可以通过 Azure 门户使用 C#Python 以编程方式管理到事件中心的连接,也可以使用 Azure 资源管理器模板来这样做。Connecting to Event Hub can be managed through the Azure portal, programmatically with C# or Python, or with the Azure Resource Manager template.

备注

  • 分区计数不可更改,因此在设置分区计数时应考虑长期规模。The partition count isn't changeable, so you should consider long-term scale when setting partition count.
  • 使用者组对于每个使用者来说必须独一无二。Consumer group must be unique per consumer. 创建专用于 Azure 数据资源管理器连接的使用者组。Create a consumer group dedicated to Azure Data Explorer connection.

发送事件Send events

请参阅可生成数据并将其发送到事件中心的示例应用See the sample app that generates data and sends it to an event hub.

有关如何生成示例数据的示例,请参阅将数据从事件中心引入到 Azure 数据资源管理器For an example of how to generate sample data, see Ingest data from Event Hub into Azure Data Explorer

后续步骤Next steps