从事件中心引入Ingest from Event Hub

Azure 事件中心是大数据流式处理平台和事件引入服务。Azure Event Hubs is a big data streaming platform and event ingestion service. Azure 数据资源管理器通过客户管理的事件中心提供持续引入。Azure Data Explorer offers continuous ingestion from customer-managed Event Hubs.

数据格式Data format

备注

  • 压缩格式(Avro、Parquet、ORC)不支持数据压缩。Data compression is not supported for compressed formats (Avro, Parquet, ORC).
  • 压缩数据不支持自定义编码和嵌入式系统属性Custom encoding and embedded system properties are not supported on compressed data.

引入属性Ingestion properties

引入属性会指示引入过程、数据路由到的位置以及数据处理方式。Ingestion properties instruct the ingestion process, where to route the data, and how to process it. 可以使用 EventData.Properties 指定事件引入的引入属性You can specify ingestion properties of the events ingestion using the EventData.Properties. 可以设置以下属性:You can set the following properties:

属性Property 说明Description
Table 现有目标表的名称(区分大小写)。Name (case sensitive) of the existing target table. 替代“Data Connection”边栏选项卡上设置的“Table”。Overrides the Table set on the Data Connection blade.
格式Format 数据格式。Data format. 替代“Data Connection”边栏选项卡上设置的“Data format”。Overrides the Data format set on the Data Connection blade.
IngestionMappingReferenceIngestionMappingReference 要使用的现有引入映射的名称。Name of the existing ingestion mapping to be used. 替代“Data Connection”边栏选项卡上设置的“Column mapping”。Overrides the Column mapping set on the Data Connection blade.
压缩Compression 数据压缩。None(默认值)或 GZip 压缩。Data compression, None (default), or GZip compression.
编码Encoding 数据编码,默认值为 UTF8。Data encoding, the default is UTF8. 可以是 .NET 支持的任何编码Can be any of .NET supported encodings.
标记(预览版)Tags (Preview) 将要与引入的数据(格式设置为 JSON 数组字符串)关联的标记的列表。A list of tags to associate with the ingested data, formatted as a JSON array string. 使用标记时存在性能影响There are performance implications when using tags.

事件路由Events routing

设置到 Azure 数据资源管理器群集的事件中心连接时,请指定目标表属性(表名、数据格式、压缩和映射)。When you set up an Event Hub connection to Azure Data Explorer cluster, you specify target table properties (table name, data format, compression, and mapping). 数据的默认路由也称为 static routingThe default routing for your data is also referred to as static routing. 还可以使用事件属性指定每个事件的目标表属性。You can also specify target table properties for each event, using event properties. 连接将按照 EventData.Properties 中指定的要求动态路由数据,替代此事件的静态属性。The connection will dynamically route the data as specified in the EventData.Properties, overriding the static properties for this event.

在以下示例中设置事件中心详细信息,并将天气指标数据发送到 WeatherMetrics 表。In the following example, set event hub details and send weather metric data to table WeatherMetrics. 数据采用 json 格式。Data is in json format. mapping1WeatherMetrics 表中预定义。mapping1 is pre-defined on the table WeatherMetrics.

var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;

// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }; 
var data = JsonConvert.SerializeObject(metric);

// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");

// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();

事件系统属性映射Event system properties mapping

系统属性在事件排队时存储由事件中心服务设置的属性。System properties store properties that are set by the Event Hubs service, at the time the event is enqueued. Azure 数据资源管理器事件中心连接会将所选属性嵌入置于表中的数据中。The Azure Data Explorer Event Hub connection will embed the selected properties into the data landing in your table.

备注

  • 单记录事件支持系统属性。System properties are supported for single-record events.
  • 压缩数据不支持系统属性。System properties are not supported on compressed data.
  • 对于 csv 映射,属性将按下表中列出的顺序添加到记录的开头。For csv mapping, properties are added at the beginning of the record in the order listed in the table below. 对于 json 映射,将根据下表中的属性名称添加属性。For json mapping, properties are added according to property names in the following table.

事件中心公开以下系统属性Event Hub exposes the following system properties

属性Property 数据类型Data Type 说明Description
x-opt-enqueued-timex-opt-enqueued-time datetimedatetime 将事件排队时的 UTC 时间UTC time when the event was enqueued
x-opt-sequence-numberx-opt-sequence-number longlong 事件中心分区流中的事件的逻辑序列号The logical sequence number of the event within the partition stream of the Event Hub
x-opt-offsetx-opt-offset stringstring 事件与事件中心分区流之间的偏移量。The offset of the event from the Event Hub partition stream. 偏移量标识符在事件中心流的分区中独一无二The offset identifier is unique within a partition of the Event Hub stream
x-opt-publisherx-opt-publisher stringstring 发布服务器名称(如果消息已发送到发布服务器终结点)The publisher name, if the message was sent to a publisher endpoint
x-opt-partition-keyx-opt-partition-key stringstring 存储了事件的相应分区的分区键The partition key of the corresponding partition that stored the event

如果在表的“数据源”部分中选择了“事件系统属性”,则必须在表架构和映射中包含这些属性。If you selected Event system properties in the Data Source section of the table, you must include the properties in the table schema and mapping.

表架构示例Table schema example

如果数据包含以下内容,则使用表架构命令创建或更改表架构:Create or alter the table schema by using the table schema command, if your data includes:

  • TimespanMetricValuethe columns Timespan, Metric, and Value
  • x-opt-enqueued-timex-opt-offset 属性the properties x-opt-enqueued-time and x-opt-offset
    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:long)

CSV 映射示例CSV mapping example

运行以下命令,将数据添加到记录的开头。Run the following commands to add data to the beginning of the record. 属性将按上表中列出的顺序添加到记录的开头。Properties are added at the beginning of the record, in the order listed in the table above. 序号值对于 CSV 映射很重要,其中的列序号将根据已映射的系统属性而更改。The ordinal values are important for CSV mapping where the column ordinals will change, based on the system properties that are mapped.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON 映射示例JSON-mapping example

使用出现在“数据连接”边栏选项卡的“事件系统属性”列表中的系统属性名称添加数据。Add data by using the system properties names as they appear in the Data connection blade Event system properties list. 运行:Run:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.metric_value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

创建事件中心连接Create event hub connection

备注

为了获得最佳性能,请在 Azure 数据资源管理器群集所在的区域中创建所有资源。For best performance, create all resources in the same region as the Azure Data Explorer cluster.

创建事件中心Create an event hub

创建事件中心(如果还没有事件中心)。If you don't already have one, Create an event hub. 可以在有关如何创建事件中心的操作指南中找到模板。A template can be found in the how-to Create an event hub guide.

备注

  • 分区计数不可更改,因此在设置分区计数时应考虑长期规模。The partition count isn't changeable, so you should consider long-term scale when setting partition count.
  • 使用者组对于每个使用者来说必须独一无二。Consumer group must be unique per consumer. 创建专用于 Azure 数据资源管理器连接的使用者组。Create a consumer group dedicated to Azure Data Explorer connection.

与 Azure 数据资源管理器的数据引入连接Data ingestion connection to Azure Data Explorer

备注

如果选择了“我的数据包括路由信息”,则必须提供必要的路由信息作为事件属性的一部分。If My data includes routing info selected, you must provide the necessary routing information as part of the events properties.

备注

设置连接后,它将从在其创建后排队的事件开始引入数据。Once the connection is set, it ingests data starting from events enqueued after its creation time.

生成数据Generating data

  • 请参阅可生成数据并将其发送到事件中心的示例应用See the sample app that generates data and sends it to an event hub.

事件可以包含一个或多个记录(直到达到事件的大小限制)。An event can contain one or more records, up to its size limit. 在下面的示例中,我们将发送两个事件,每个事件有五个追加的记录:In the following sample we send two events, each has five records appended:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) }; 
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();