适用于 Azure Cosmos DB 的 Kafka Connect - V2

Kafka Connect 是一种工具,用于在 Apache Kafka 和其他系统之间缩放且可靠地流式传输数据。 使用 Kafka Connect 可以定义将大型数据集移入和移出 Kafka 的连接器。 Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。

源和汇连接器语义

  • 源连接器 - 当前此连接器仅支持一次。

  • 汇聚连接器 - 此连接器完全支持精确一次语义。

支持的 kafka 版本

3.6.0+

受支持的数据格式

可以将源和接收器连接器配置为支持以下数据格式:

Format Description
纯 JSON 不带任何附加架构的 JSON 记录结构。
包含架构的 JSON 具有显式架构信息的 JSON 记录结构,以确保数据与预期格式匹配。
AVRO 在 Apache Hadoop 项目中开发的面向行的远程过程调用和数据序列化框架。 它使用 JSON 定义数据类型、协议,并使用压缩的二进制格式序列化数据。

键和值设置(包括格式和序列化)可以在 Kafka 中独立配置。 因此,可以分别对键和值使用不同的数据格式。 为了支持不同的数据格式,存在针对key.convertervalue.converter的转换器配置。

转换器配置示例

纯 JSON

如果需要在没有架构注册表的情况下使用 JSON 连接数据,请使用 Kafka 支持的 JsonConverter。 以下示例显示了 JsonConverter 添加到配置的键和值属性:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

包含架构的 JSON

设置属性 key.converter.schemas.enablevalue.converter.schemas.enable 设置为 true,以便将键或值视为包含内部架构和数据的组合 JSON 对象。 如果没有这些属性,键或值将被视为纯 JSON。

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

指向 Kafka 的结果消息如以下示例所示,其中架构和有效负载为 JSON 中的顶级元素:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "userid": 123,
    "name": "user's name"
  }
}

注释

写入 Azure Cosmos DB 的消息由架构和有效负载组成。 请注意消息的大小,以及由有效负载与架构构成的比例。 每次写入 Kafka 的消息中都会重复该架构。 在这种情况下,你可能想要使用 JSON 架构或 AVRO 等序列化格式,其中架构单独存储,而消息仅保留有效负载。

AVRO

Kafka 连接器支持 AVRO 数据格式。 若要使用 AVRO 格式,请配置一个 AvroConverter 连接器,以便连接器知道如何处理 AVRO 数据。 Azure Cosmos DB Kafka Connect 通过 Confluent 提供的 AvroConverter (Apache 2.0 许可证)进行测试。 如果愿意,也可以使用其他自定义转换器。

Kafka 独立处理键和值。 在工作者配置中按要求指定key.convertervalue.converter属性。 使用 AvroConverter时,请添加一个额外的转换器属性,该属性提供架构注册表的 URL。 以下示例显示了添加到配置的 AvroConverter 键和值属性:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

选择转换格式

下面是有关如何选择转换格式的一些注意事项:

  • 配置 源连接器时:

    • 如果希望 Kafka Connect 在写入 Kafka 的消息中包含纯 JSON,请设置 纯 JSON 配置。

    • 如果希望 Kafka Connect 在写入 Kafka 的消息中包含架构,请设置 JSON with Schema 配置。

    • 如果希望 Kafka Connect 在写入 Kafka 的消息中包含 AVRO 格式,请设置 AVRO 配置。

  • 如果您将 Kafka 主题中的 JSON 数据消费到 Sink 连接器,请确定 JSON 在写入 Kafka 主题时是如何序列化的:

    • 如果是使用 JSON 序列化程序编写的,请将 Kafka Connect 设置为使用 JSON 转换器 (org.apache.kafka.connect.json.JsonConverter)

      • 如果 JSON 数据是作为纯字符串写入的,请确定数据是否包括嵌套架构或有效负载。 如果是, 请使用架构 配置设置 JSON。
      • 但是,如果使用的是 JSON 数据,并且它没有架构或有效负载构造,则必须根据纯 JSON 配置设置schemas.enable=false告知连接器不要查找架构。
    • 如果是使用 AVRO 序列化程序编写的,请根据 AVRO 配置将 Kafka Connect 设置为使用 (io.confluent.connect.avro.AvroConverter) 转换器

配置

常见配置属性

源和接收器连接器共享以下常见配置属性:

配置属性名称 违约 Description
azure.cosmos.account.endpoint None Cosmos DB 帐户终结点 URI
azure.cosmos.account.environment Azure Cosmos DB 帐户的 Azure 环境: AzureChina
azure.cosmos.account.tenantId "" Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。
azure.cosmos.auth.type MasterKey 目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 帐户密钥(仅在auth.typeMasterKey时需要)
azure.cosmos.auth.aad.clientId "" 服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。
azure.cosmos.auth.aad.clientSecret "" 服务主体的客户端密钥/密码。
azure.cosmos.mode.gateway false 用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。
azure.cosmos.application.name "" 应用程序名称。 它被添加为 userAgent 的后缀。
azure.cosmos.throughputControl.enabled false 指示是否启用吞吐量控制的标志。
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 吞吐量控制账户终结点 URI。
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 帐户的 Azure 环境: AzureChina
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。
azure.cosmos.throughputControl.auth.type MasterKey 目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 吞吐量控制帐户密钥(仅在 throughputControl.auth.typeMasterKey 时需要)。
azure.cosmos.throughputControl.auth.aad.clientId "" 服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。
azure.cosmos.throughputControl.auth.aad.clientSecret "" 服务主体的客户端密钥/密码。
azure.cosmos.throughputControl.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。
azure.cosmos.throughputControl.mode.gateway false 用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.throughputControl.group.name "" 吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。
azure.cosmos.throughputControl.targetThroughput -1 吞吐量控制组目标吞吐量。 该值应大于 0。
azure.cosmos.throughputControl.targetThroughputThreshold -1 吞吐量控制组目标吞吐量阈值。 该值应介于 (0,1) 之间。
azure.cosmos.throughputControl.priorityLevel None 吞吐量控制组优先级。 该值可以是 None、High 或 Low。
azure.cosmos.throughputControl.globalControl.database.name "" 用于吞吐量全局控制的数据库。
azure.cosmos.throughputControl.globalControl.container.name "" 用于吞吐量全局控制的容器。
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 这会控制客户端更新自身吞吐量使用情况的频率,并根据其他客户端的吞吐量使用情况调整自己的吞吐量共享。 默认值为 5 秒,允许的最小值为 5 秒。
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 这可以控制我们如何快速检测客户端离线的速度,从而允许其他客户端接管其分配的吞吐量。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。

有关汇聚连接器特定配置,请参阅 汇聚连接器文档

有关特定于源连接器的配置,请参阅 源连接器文档

常见配置错误

如果在 Kafka Connect 中错误配置转换器,则可能会导致错误。 这些错误显示在连接器上,因为你尝试反序列化已存储在 Kafka 中的消息。 转换器问题通常不会出现在源中,因为序列化是在源中设置的。

有关详细信息,请参阅 常见配置错误 文档。

项目设置

有关初始设置说明,请参阅 开发人员设置

资源

后续步骤