适用于 Azure Cosmos DB 的 Kafka Connect - V2

适用范围: NoSQL

Kafka Connect 是一项工具,用于在 Apache Kafka 和其他系统之间以可缩放且可靠的方式流式传输数据。 使用 Kafka Connect 可以定义将大型数据集移入和移出 Kafka 的连接器。 Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读写数据。

源连接器和接收器连接器语义

  • 源连接器 - 当前此连接器仅支持一次。

  • 接收器连接器 - 此连接器完全支持“恰好一次”语义。

支持的 Kafka 版本

3.6.0+

受支持的数据格式

源连接器和接收器连接器可以配置为支持以下数据格式:

格式 说明
纯 JSON 没有任何附加架构的 JSON 记录结构。
包含架构的 JSON 包含显式架构信息的 JSON 记录结构,以确保数据与预期格式匹配。
AVRO 在 Apache 的 Hadoop 项目中开发的面向行的远程过程调用和数据序列化框架。 它使用 JSON 来定义数据类型、协议,并采用紧凑的二进制格式来序列化数据。

可以在 Kafka 中单独配置键和值设置,包括格式和序列化。 因此,可以分别对键和值使用不同的数据格式。 为了适应不同的数据格式,key.convertervalue.converter 都提供了转换器配置。

转换器配置示例

纯 JSON

如果需要使用不带架构注册表的 JSON 来连接数据,请使用 Kafka 支持的 JsonConverter。 以下示例显示了添加到配置的 JsonConverter 键和值属性:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

包含架构的 JSON

将属性 key.converter.schemas.enablevalue.converter.schemas.enable 设置为 true,以便将键或值视为同时包含内部架构和数据的复合 JSON 对象。 如果没有这些属性,键或值将被视为纯 JSON。

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

发送到 Kafka 的结果消息如以下示例所示,其中架构和有效负载是 JSON 中的顶层元素:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "userid": 123,
    "name": "user's name"
  }
}

注意

写入 Azure Cosmos DB 的消息由架构和有效负载组成。 请注意消息的大小,以及它所包含的有效负载与架构的比例。 写入到 Kafka 的每条消息中都会重复此架构。 在此类场景中,可能需要使用 JSON 架构或 AVRO 等序列化格式,其中架构单独进行存储,消息仅保存有效负载。

AVRO

Kafka 连接器支持 AVRO 数据格式。 若要使用 AVRO 格式,请配置 AvroConverter,以便连接器了解如何处理 AVRO 数据。 Azure Cosmos DB Kafka Connect 已在 Apache 2.0 许可下使用 Confluent 提供的 AvroConverter 进行了测试。 你也可以根据需要使用不同的自定义转换器。

Kafka 单独处理键和值。 在辅助角色配置中根据需要指定 key.convertervalue.converter 属性。 使用 AvroConverter 时,添加一个额外的转换器属性,该属性提供架构注册表的 URL。 以下示例显示了添加到配置的 AvroConverter 键和值属性:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

选择转换格式

下面是有关如何选择转换格式的一些注意事项:

  • 在配置源连接器时:

    • 如果希望 Kafka Connect 在其写入 Kafka 的消息中包含纯 JSON,请设置纯 JSON 配置。

    • 如果希望 Kafka Connect 在其写入 Kafka 的消息中包含架构,请设置包含架构的 JSON 配置。

    • 如果希望 Kafka Connect 在其写入 Kafka 的消息中包含 AVRO 格式,请设置 AVRO 配置。

  • 如果要将 Kafka 主题中的 JSON 数据用于接收器连接器,请了解在将 JSON 写入 Kafka 主题时如何对该 JSON 进行序列化:

    • 如果它是使用 JSON 序列化程序编写的,请将 Kafka Connect 设置为使用 JSON 转换器 (org.apache.kafka.connect.json.JsonConverter)

      • 如果 JSON 数据是以普通字符串形式编写的,请确定该数据是否包含嵌套的架构或有效负载。 如果是,请设置包含架构的 JSON 配置。
      • 但是,如果使用的 JSON 数据没有架构或有效负载构造,则必须通过按照纯 JSON 配置设置 schemas.enable=false 来告知连接器不要查找架构
    • 如果它是使用 AVRO 序列化程序编写的,请按照 (io.confluent.connect.avro.AvroConverter) 配置将 Kafka Connect 设置为使用 AVRO 转换器

配置

常见配置属性

源连接器和接收器连接器共享以下常见配置属性:

配置属性名称 默认值 说明
azure.cosmos.account.endpoint Cosmos DB 帐户终结点 URI
azure.cosmos.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 帐户密钥(仅当 auth.typeMasterKey 时才需要)
azure.cosmos.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供了首选区域用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.application.name "" 应用程序名称。 它作为 userAgent 后缀添加。
azure.cosmos.throughputControl.enabled false 用于指示是否启用吞吐量控制的标志。
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 吞吐量控制帐户终结点 URI。
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 吞吐量控制帐户密钥(仅当 throughputControl.auth.typeMasterKey 时为必需)。
azure.cosmos.throughputControl.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.throughputControl.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供了首选区域用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.throughputControl.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.throughputControl.group.name "" 吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。
azure.cosmos.throughputControl.targetThroughput -1 吞吐量控制组目标吞吐量。 该值应大于 0。
azure.cosmos.throughputControl.targetThroughputThreshold -1 吞吐量控制组目标吞吐量阈值。 该值的区间范围为 (0,1]。
azure.cosmos.throughputControl.priorityLevel None 吞吐量控制组优先级。 该值可以是 None、High 或 Low。
azure.cosmos.throughputControl.globalControl.database.name "" 用于吞吐量全局控制的数据库。
azure.cosmos.throughputControl.globalControl.container.name "" 用于吞吐量全局控制的容器。
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 此值控制客户端更新自身吞吐量使用情况并根据其他客户端的吞吐量使用情况调整自己吞吐量份额的频率。 默认值为 5 秒,允许的最小值为 5 秒。
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 这将控制检测到客户端脱机进而允许其他客户端获取其吞吐量份额的时间。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。

有关接收器连接器特定的配置,请参阅接收器连接器文档

有关源连接器特定的配置,请参阅源连接器文档

常见配置错误

如果 Kafka Connect 中的转换器配置有误,则可能会导致错误。 这些错误会出现在连接器上,因为你尝试反序列化已存储在 Kafka 中的消息。 源连接器中通常不会发生转换器问题,因为序列化在源连接器中进行设置。

有关详细信息,请参阅常见配置错误文档。

项目设置

有关初始设置说明,请参阅开发人员设置

资源

后续步骤