Azure 事件中心数据连接

Azure 事件中心是大数据流式处理平台和事件引入服务。 Azure 数据资源管理器通过客户管理的事件中心提供持续引入。

事件中心引入管道通过几个步骤将事件传输到 Azure 数据资源管理器。 首先,在 Azure 门户中创建事件中心。 然后,创建 Azure 数据资源管理器目标表,使用提供的引入属性特定格式的数据引入到该表中。 事件中心连接需要了解事件路由。 根据事件系统属性,可能会使用选定的属性嵌入数据。 与事件中心建立连接,以创建事件中心发送事件。 可以通过 Azure 门户使用 C#Python 以编程方式管理此过程,也可以使用 Azure 资源管理器模板来这样做。

有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述

Azure 数据资源管理器数据连接身份验证选项

  • 基于托管标识的数据连接(推荐):基于托管标识的数据连接是连接到数据源的最安全方式。 它完全控制从数据源提取数据的能力。 使用托管标识设置数据连接需要执行以下步骤:

    1. 将一个托管标识添加到群集
    2. 向该托管标识授予对数据源的权限。 若要从 Azure 事件中心提取数据,托管标识必须具有 Azure 事件中心数据接收器权限。
    3. 在目标数据库上设置托管标识策略
    4. 创建使用托管标识身份验证的数据连接来提取数据。

    注意

    如果从数据源中删除托管标识权限,则数据连接将不再有效,并且无法从数据源中提取数据。

  • 基于密钥的数据连接:如果未为数据连接指定托管标识身份验证,则连接会自动默认为基于密钥的身份验证。 基于密钥的连接使用资源连接字符串(如 Azure事件中心连接字符串)提取数据。 Azure 数据资源管理器获取指定资源的资源连接字符串,并安全地保存它。 该连接字符串之后用于从数据源提取数据。

    注意

    如果轮换密钥,则数据连接将不再有效,并且无法从数据源中提取数据。 若要解决此问题,请更新或重新创建数据连接。

数据格式

注意

  • 从事件中心引入不支持 RAW 格式。
  • 不支持 Azure 事件中心架构注册表和无架构 Avro。
  • 可使用 gzip 压缩算法来压缩数据。 可使用引入属性动态指定 Compression,或者在静态数据连接设置中指定。
  • 二进制格式(Avro、ApacheAvro、Parquet、ORC 和 W3CLOGFILE)不支持数据压缩。
  • 二进制格式和压缩数据不支持自定义编码和嵌入式系统属性
  • 使用二进制格式(Avro、ApacheAvro、Parquet、ORC 和 W3CLOGFILE)和引入映射时,引入映射定义中的字段顺序必须与表中相应列的顺序一致。

事件中心属性

Azure 数据资源管理器支持以下事件中心属性:

  • 一组封闭的引入属性,它们可以帮助将事件路由到相关的表。
  • 一组封闭的事件系统属性,可以根据给定的映射将它们嵌入在数据中。

注意

不支持引入用于将元数据与事件相关联的事件中心自定义属性。 如果需要引入自定义属性,请在事件数据的正文中发送这些属性。 有关详细信息,请参阅引入自定义属性

引入属性

引入属性会指示引入过程、数据路由到的位置以及数据处理方式。 可以使用 EventData.Properties 指定事件引入的引入属性。 可以设置以下属性:

注意

属性名称区分大小写。

属性 说明
数据库 目标数据库的名称,区分大小写。 默认情况下,数据将引入到与数据连接关联的目标数据库中。 使用此属性重写默认数据库,并将数据发送到其他数据库。 为此,必须先将连接设置为多数据库连接
现有目标表的名称,区分大小写。 替代“Data Connection”窗格上设置的“Table”。
格式 数据格式。 替代“Data Connection”窗格上设置的“Data format”。
IngestionMappingReference 要使用的现有引入映射的名称。 替代“Data Connection”窗格上设置的“Column mapping”。
压缩 数据压缩、None(默认值)或 gzip
编码 数据编码,默认值为 UTF8。 可以是 .NET 支持的任何编码
Tags 将要与引入的数据(格式设置为 JSON 数组字符串)关联的标记的列表。 使用标记时存在性能影响
RawHeaders 指示事件源是 Kafka,并且 Azure 数据资源管理器必须使用字节数组反序列化来读取其他路由属性。 值将被忽略。

注意

除非提供自定义检索开始日期,否则只会引入创建数据连接后排队的事件。 在任何情况下,回溯期都不能超过实际事件中心保留期。

事件路由

在与群集建立数据连接时,可以指定要将引入的数据发送到何处的路由。 默认路由是发送到与目标数据库关联的连接字符串中指定的目标表。 数据的默认路由也称为静态路由。 通过设置上一段中提到的一个或多个事件数据属性,可以指定数据的替代路由和处理选项。

注意

事件中心数据连接将尝试处理从事件中心读取的所有事件,并且它出于任何原因无法处理的事件都将报告为引入失败。 在此处了解如何监视 Azure 数据资源管理器引入。

将事件数据路由到备用数据库

默认已禁用将数据路由到备用数据库。 若要将数据发送到其他数据库,必须先将连接设置为多数据库连接。 可以在 Azure 门户 Azure 门户 中、使用 C#Python 管理 SDK 或者使用 ARM 模板启用此功能。 用于允许数据库路由的用户、组、服务主体或托管标识在群集上必须至少具有参与者角色和写入权限。

若要指定备用数据库,请设置数据库引入属性

警告

如果指定了备用数据库,而未将连接设置为多数据库数据连接,则会导致引入失败。

将事件数据路由到备用表

若要为每个事件指定备用表,请设置“表”、“格式”、“压缩”和映射引入属性。 连接将按照 EventData.Properties 中指定的要求动态路由引入的数据,并重写此事件的静态属性。

以下示例演示如何设置事件中心详细信息,并将天气指标数据发送到备用数据库 (MetricsDB) 和表 (WeatherMetrics)。 数据采用 JSON 格式,mapping1 是在表 WeatherMetrics 中预定义的。

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { TimeStamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

事件中心系统属性映射

系统属性是在事件排队时由事件中心服务设置的字段。 Azuer 数据资源管理器事件中心数据连接可以根据给定的映射,将一组预定义的系统属性嵌入到引入表中的数据。

注意

  • json 格式和表格格式(即 JSONMultiJSONCSVTSVPSVSCsvSOHsvTSVE)支持嵌入系统属性。
  • 使用不受支持的格式(即 TXT 或 ParquetAvro 等压缩格式)时,仍会引入数据,但会忽略属性。
  • 设置事件中心消息压缩时,不支持嵌入系统属性。 在这种情况下,将发出相应的错误,并且不会引入数据。
  • 对于表格数据,仅单记录事件消息支持系统属性。
  • 对于 json 数据,多记录事件消息也支持系统属性。 在这种情况下,系统属性仅添加到事件消息的第一条记录中。
  • 对于 CSV 映射,属性将按创建数据连接时列出的顺序添加到记录的开头。 不要依赖于这些属性的顺序,因为该顺序将来可能会变化。
  • 对于 JSON 映射,将根据系统属性表中的属性名称添加属性。

事件中心服务公开以下系统属性:

属性 数据类型 说明
x-opt-enqueued-time datetime 将事件排队时的 UTC 时间
x-opt-sequence-number long 事件中心分区流中的事件的逻辑序列号
x-opt-offset string 事件与事件中心分区流之间的偏移量。 偏移量标识符在事件中心流的分区中独一无二
x-opt-publisher string 发布服务器名称(如果消息已发送到发布服务器终结点)
x-opt-partition-key string 存储了事件的相应分区的分区键

使用 IoT Central 事件中心时,还可以在有效负载中嵌入 IoT 中心系统属性。 有关完整列表,请参阅 IoT 中心系统属性

如果在表的“数据源”部分中选择了“事件系统属性”,则必须在表架构和映射中包含这些属性。

架构映射示例

表架构映射示例

如果数据包含三列(TimeStampMetricNameValue)并且包含的属性是 x-opt-enqueued-timex-opt-offset,请使用以下命令创建或更改表架构:

    .create-merge table TestTable (TimeStamp: datetime, MetricName: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV 映射示例

运行以下命令,将数据添加到记录的开头。 记下序号值。

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "TimeStamp", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "MetricName", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON 映射示例

使用系统属性映射添加数据。 运行以下命令:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "TimeStamp", "Properties":{"Path":"$.TimeStamp"}},'
    '    { "column" : "MetricName", "Properties":{"Path":"$.MetricName"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.Value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

事件中心捕获 Avro 文件的架构映射

使用事件中心数据的一种方法是通过 Azure Blob 存储或 Azure Data Lake Storage 中的 Azure 事件中心捕获事件。 然后,可以在使用 Azure 数据资源管理器中的事件网格数据连接写入捕获文件时引入这些文件。

捕获文件的架构与发送到事件中心的原始事件的架构不同。 在设计目标表架构时,应考虑到这种差异。 具体而言,事件负载在捕获文件中表示为字节数组,并且事件网格 Azure 数据资源管理器数据连接不会自动解码此数组。 若要详细了解事件中心 Avro 捕获数据的文件架构,请参阅探索 Azure 事件中心中捕获的 Avro 文件

若要正确解码事件负载,请执行以下操作:

  1. 将捕获事件的 Body 字段映射到目标表中 dynamic 类型的列。
  2. 应用一项更新策略,该策略使用 unicode_codepoints_to_string() 函数将字节数组转换为可读字符串。

引入自定义属性

从事件中心引入事件时,数据将取自事件数据对象的 body 节。 但是,事件中心自定义属性是在该对象的 properties 节中定义的,不会引入。 若要引入客户属性,必须将其嵌入到该对象的 body 节中的数据。

以下示例将包含事件中心定义的自定义属性 customProperty 的事件数据对象(左)与引入时所需的嵌入属性(右)进行比较。

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

可使用以下方法之一将自定义属性嵌入到事件数据对象的 body 节中的数据:

  • 在事件中心内创建事件数据对象时,将自定义属性嵌入为该对象的 body 节中的数据的一部分。
  • 使用 Azure 流分析来处理来自事件中心的事件并将自定义属性嵌入到事件数据中。 在 Azure 流分析中,可以使用 Azure 数据资源管理器输出连接器本机引入数据,或者将数据路由到另一个事件中心并从中路由到群集。
  • 使用 Azure Functions 添加自定义属性,然后引入数据。

跨区域事件中心数据连接

为获得最佳性能,请在群集所在的同一区域中创建以下所有资源。 如果没有其他替代方法,请考虑使用高级专用事件中心层级。 可以在此处找到事件中心层级对比。

创建事件中心

创建事件中心(如果还没有事件中心)。 可以通过 Azure 门户、使用 C#Python 以编程方式或使用 Azure 资源管理器模板来管理到数据中心的连接。

注意

  • 仅事件中心高级层和专用层提供在创建事件中心后动态添加分区的功能。 在设置分区计数时,要考虑长期缩放。
  • 使用者组对于每个使用者来说必须独一无二。 创建专用于 Azure 数据资源管理器连接的使用者组。

发送事件

请参阅可生成数据并将其发送到事件中心的示例应用

设置异地灾难恢复解决方案

事件中心提供异地灾难恢复解决方案。 Azure 数据资源管理器不支持 Alias 事件中心命名空间。 若要在解决方案中实现异地灾难恢复,请创建两个事件中心数据连接:一个用于主命名空间,另一个用于辅助命名空间。 Azure 数据资源管理器会侦听这两个事件中心连接。

注意

用户负责实现从主命名空间到辅助命名空间的故障转移。