Kafka Connect for Azure Cosmos DB - 接收器连接器 v2

适用范围: NoSQL

Kafka Connect for Azure Cosmos DB 是用于从/向 Azure Cosmos DB 读取/写入数据的连接器。 使用 Azure Cosmos DB 接收器连接器可将 Apache Kafka 主题中的数据导出到 Azure Cosmos DB 数据库。 该连接器基于主题订阅从 Kafka 中轮询数据,以将其写入到数据库中的容器。

先决条件

  • 请从 Confluent 平台设置开始,因为完成此过程可以提供一个可用的完整环境。 如果你不想要使用 Confluent 平台,则需要自行安装并配置 Zookeeper、Apache Kafka 和 Kafka Connect。 还需要手动安装并配置 Azure Cosmos DB 连接器。
  • 按照容器设置指南创建一个 Azure Cosmos DB 帐户
  • Bash shell
  • 下载 Java 11+
  • 下载 Maven

安装接收器连接器

如果使用的是建议的 Confluent 平台设置,则 Azure Cosmos DB 接收器连接器已包含在安装中,因此可以跳过此步骤。

否则,可以从最新版本下载 JAR 文件,或者打包此存储库以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅这些说明。 还可以从源代码打包新的 JAR 文件。

# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos

mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install

# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar

创建 Kafka 主题并写入数据

如果使用的是 Confluent 平台,则创建 Kafka 主题的最简单方法是使用提供的控制中心 UX。 否则,可以使用以下语法手动创建 Kafka 主题:

./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

对于此方案,我们将创建一个名为“hotels”的 Kafka 主题,并将非架构嵌入式 JSON 数据写入该主题。 若要在控制中心创建主题,请参阅 Confluent 指南

接下来,启动 Kafka 控制台生成器以将少量的记录写入“hotels”主题。

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

在控制台生成器中,输入:

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

输入的三条记录将以 JSON 格式发布到“hotels”Kafka 主题。

创建接收器连接器

在 Kafka Connect 中创建 Azure Cosmos DB 接收器连接器。 以下 JSON 正文定义接收器连接器的配置。 确保替换 azure.cosmos.account.endpointazure.cosmos.account.key(在学习先决条件部分所述的 Azure Cosmos DB 设置指南时保存了这些属性)的值。

有关其中每个配置属性的详细信息,请参阅接收器属性

{
  "name": "cosmosdb-sink-connector-v2",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector",
    "tasks.max": "5",
    "topics": "{topic}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "azure.cosmos.account.endpoint":"{endpoint}",
    "azure.cosmos.account.key":"{masterKey}",
    "azure.cosmos.applicationName": "{applicationName}",
    "azure.cosmos.sink.database.name":"{databaseName}",
    "azure.cosmos.sink.containers.topicMap":"{topic}#{container}"
  }
}

填写所有值后,将 JSON 文件保存在本地的某个位置。 可以使用此文件通过 REST API 创建连接器。

使用控制中心创建连接器

在控制中心网页上可以轻松创建连接器。 请按照此安装指南通过控制中心创建连接器。 请不要使用 DatagenConnector 选项,而应改用 CosmosSinkConnector 磁贴。 配置接收器连接器时,请填写在 JSON 文件中填充的值。

或者,也可以在“连接器”页中,使用“上传连接器配置文件”选项上传前面创建的 JSON 文件。

“浏览连接器”对话框中“上传连接器配置文件”选项的屏幕截图。

使用 REST API 创建连接器

使用 Kafka“连接”REST API 创建接收器连接器:

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

确认数据已写入 Azure Cosmos DB

登录到 Azure 门户并导航到你的 Azure Cosmos DB 帐户。 检查该帐户中是否创建了来自“hotels”主题的三条记录。

清理

若要在控制中心删除连接器,请导航到创建的接收器连接器,然后选择“删除”图标

接收器连接器对话框中的“删除”选项的屏幕截图。

或者,可以使用 Kafka“连接”REST API 删除连接器:

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅这些步骤

接收器配置属性

以下设置用于配置 Azure Cosmos DB Kafka 接收器连接器。 这些配置值决定了要使用哪些 Kafka 主题、将数据写入哪个 Azure Cosmos DB 容器,以及用于序列化数据的格式。 有关包含默认值的示例配置文件,请参阅此配置

配置属性名称 默认值 说明
connector.class Azure Cosmos DB 源的类名。 它应设置为 com.azure.cosmos.kafka.connect.CosmosSinkConnector
azure.cosmos.account.endpoint Cosmos DB 帐户终结点 URI
azure.cosmos.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 帐户密钥(仅当 auth.typeMasterKey 时才需要)
azure.cosmos.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供了首选区域用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.application.name "" 应用程序名称。 它作为 userAgent 后缀添加。
azure.cosmos.throughputControl.enabled false 用于指示是否启用吞吐量控制的标志。
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 吞吐量控制帐户终结点 URI。
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 吞吐量控制帐户密钥(仅当 throughputControl.auth.typeMasterKey 时为必需)。
azure.cosmos.throughputControl.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.throughputControl.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,它提供用作提示的首选区域。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.throughputControl.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.throughputControl.group.name "" 吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。
azure.cosmos.throughputControl.targetThroughput -1 吞吐量控制组目标吞吐量。 该值应大于 0。
azure.cosmos.throughputControl.targetThroughputThreshold -1 吞吐量控制组目标吞吐量阈值。 该值的区间范围为 (0,1]。
azure.cosmos.throughputControl.priorityLevel None 吞吐量控制组优先级。 该值可以是 None、High 或 Low。
azure.cosmos.throughputControl.globalControl.database.name "" 要用于吞吐量全局控制的数据库。
azure.cosmos.throughputControl.globalControl.container.name "" 要用于吞吐量全局控制的容器。
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 这会控制客户端更新自身吞吐量使用情况并根据其他客户端的吞吐量使用情况调整自己吞吐量份额的频率。 默认值为 5 秒,允许的最小值为 5 秒。
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 这将控制检测到客户端脱机进而允许其他客户端获取其吞吐量份额的时间。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。
azure.cosmos.sink.database.name "" Cosmos DB 数据库名称。
azure.cosmos.sink.containers.topicMap "" 映射到 Cosmos 容器的 Kafka 主题的逗号分隔列表。 例如:topic1#con1、topic2#con2。
azure.cosmos.sink.errors.tolerance.level None 耗尽所有重试后的错误容错级别。 None 用于出错时失败。 All 用于日志并继续
azure.cosmos.sink.bulk.enabled true 用于指示是否为接收器连接器启用 Cosmos DB 批量模式的标志。 默认情况下,它为 true。
azure.cosmos.sink.bulk.maxConcurrentCosmosPartitions -1 Cosmos DB 最大并发 Cosmos 分区。 如果未指定,则根据容器的物理分区数确定,该数目指示每个批处理应具有来自所有 Cosmos 物理分区的数据。 如果已指定,则指示每个批处理数据的 Cosmos DB 物理分区数。 因此,当重新分区每个批处理中的输入数据以平衡每个批处理需要写入的 Cosmos 分区数时,此配置可用于提高批量处理效率。 这对于大型容器(具有数百个物理分区)非常有用。
azure.cosmos.sink.bulk.initialBatchSize 1 Cosmos DB 初始批量微型批大小--当加入队列的文档数量超过此大小或达到目标有效载荷大小时,此微批次将被刷新到后端。 微批大小会根据限制速率自动调整。 默认情况下,初始微批大小为 1。 如果希望避免前几个请求消耗过多 RU,请减少此数值。
azure.cosmos.sink.write.strategy ItemOverwrite Cosmos DB 写入策略:ItemOverwrite(使用 upsert)、ItemAppend(使用 create,忽略“冲突”的已有项目)、ItemDelete(基于数据帧的 id/pk 删除)、ItemDeleteIfNotModified(如果自收集 id/pk 以来 etag 未发生变化,则根据数据帧的 id/pk 进行删除),ItemOverwriteIfNotModified(如果 etag 为空,则使用 create,否则使用 etag 前提条件更新/替换,如果文档已更新,则忽略前提条件失败的情况),ItemPatch(根据补丁配置对所有文件进行部分更新)
azure.cosmos.sink.maxRetryCount 10 对于接收器连接器的写入失败,Cosmos DB 最大重试尝试次数。 默认情况下,连接器对瞬时写入错误最多重试 10 次。
azure.cosmos.sink.id.strategy ProvidedInValueStrategy 用于在文档中填充 id 的策略。 有效的策略包括:TemplateStrategyFullKeyStrategyKafkaMetadataStrategyProvidedInKeyStrategyProvidedInValueStrategy。 前缀为 id.strategy 的配置属性会传递给策略。 例如,使用 id.strategy=TemplateStrategy 时,属性 id.strategy.template 将传递到模板策略,并用于指定构造 id 时要使用的模板字符串。
azure.cosmos.sink.write.patch.operationType.default Set 默认 Cosmos DB 修补操作类型。 支持的功能包括 none、add、set、replace、remove、increment。 对于 no-op,请选择 none。
azure.cosmos.sink.write.patch.property.configs "" Cosmos DB 修补 json 属性配置。 它可以包含多个与以下模式匹配的定义,这些模式用逗号分隔。 property(jsonProperty).op(operationType) 或 property(jsonProperty).path(patchInCosmosdb).op(operationType) - 第二种模式的不同之处在于,它还允许您定义不同的 Cosmos DB 路径。 注意:它不支持嵌套 json 属性配置。
azure.cosmos.sink.write.patch.filter "" 用于条件修补程序

数据以 JSON 格式写入 Azure Cosmos DB,不含任何架构。

支持的数据类型

Azure Cosmos DB Kafka 接收器连接器将接收器记录转换为支持以下架构类型的 JSON 文档:

架构类型 JSON 数据类型
Array Array
布尔 布尔
Float32 Number
Float64 Number
Int8 Number
Int16 Number
Int32 Number
Int64 Number
映射 对象 (JSON)
字符串 字符串
Null
结构 对象 (JSON)

接收器连接器还支持以下 AVRO 逻辑类型:

架构类型 JSON 数据类型
Date Number
时间 Number
Timestamp Number

注意

Azure Cosmos DB 接收器连接器目前不支持字节反序列化。

单一消息转换 (SMT)

除了接收器连接器设置以外,还可以指定使用单一消息转换 (SMT) 来修改流经 Kafka Connect 平台的消息。 有关详细信息,请参阅 Confluent SMT 文档

使用 InsertUUID SMT

可以使用 InsertUUID SMT 来自动添加项 ID。 使用自定义 InsertUUID SMT,可以在每条消息写入到 Azure Cosmos DB 之前,为其插入包含随机 UUID 值的 id 字段。

警告

仅当消息不包含 id 字段时,才使用此 SMT。 否则将覆盖 id 值,并且数据库中最终可能会出现重复项。 使用 UUID 作为消息 ID 可能既快速又简单,但 UUID 不是要在 Azure Cosmos DB 中使用的理想分区键

安装 SMT

在可以使用 InsertUUID SMT 之前,需要在 Confluent 平台安装程序中安装此转换。 如果使用的是此存储库中的 Confluent 平台安装程序,则此转换已包含在安装中,因此可以跳过此步骤。

或者,可以打包 InsertUUID 源以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅这些说明

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

配置 SMT

在接收器连接器配置中,添加以下属性以设置 id

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

有关使用此 SMT 的详细信息,请参阅 InsertUUID 存储库

使用 SMT 配置生存时间 (TTL)

使用 InsertFieldCast SMT 可对 Azure Cosmos DB 中创建的每个项配置 TTL。 在项级别启用 TTL 之前,请对容器启用 TTL。 有关详细信息,请参阅生存时间文档。

在接收器连接器配置中,添加以下属性以设置 TTL(为秒为单位)。 在以下示例中,TTL 设置为 100 秒。 如果消息已包含 TTL 该字段,则这些 SMT 将覆盖 TTL 值。

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

有关使用这些 SMT 的详细信息,请参阅 InsertField强制转换文档。

排查常见问题

下面是使用 Kafka 接收器连接器时可能会遇到的一些常见问题的解决方法。

使用 JsonConverter 读取非 JSON 数据

如果 Kafka 中的源主题包含非 JSON 数据,而你尝试使用 JsonConverter 读取这些数据,则会出现以下异常:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

以 Avro 或其他格式(如 CSV 字符串)序列化的源主题可能会导致此错误。

解决方法:如果主题数据采用 AVRO 格式,请将 Kafka Connect 接收器连接器更改为使用 AvroConverter,如下所示

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

使用 AvroConverter 读取非 avro 数据

此方案适用于使用 Avro 转换器从主题中读取非 Avro 格式的数据的情况。 这些数据包括除 Confluent 架构注册表的 Avro 序列化程序(具有其自己的传输格式)以外的 Avro 序列化程序写入的数据。

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解决方案:检查源主题的序列化格式。 然后,将连接器切换以使用适当的转换器,或者将上游格式切换为 Avro。

读取不带有预期架构/有效负载结构的 JSON 消息

Kafka Connect 支持包含有效负载和架构的特殊 JSON 消息结构,如下所示。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

如果你尝试读取不包含采用此结构的数据的 JSON 数据,将收到以下错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

确切地说,对于 schemas.enable=true,唯一有效的 JSON 结构包含架构和有效负载字段作为顶级元素,如上所示。 如错误消息所述,如果你只有普通的 JSON 数据,则应将连接器的配置更改为:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

限制

  • 不支持在 Azure Cosmos DB 中自动创建数据库和容器。 数据库和容器必须已存在,并且必须已正确配置。

后续步骤

可以通过以下文档详细了解 V4 Java SDK 中的批量操作: