Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。 Azure Cosmos DB 接收器连接器允许将数据从 Apache Kafka 主题导出到 Azure Cosmos DB 数据库。 连接器根据订阅的主题,从 Kafka 中轮询数据,并将其写入数据库中的容器。
先决条件
- 从 Confluent 平台设置 开始,因为它为你提供了一个完整的环境来使用。 如果不想使用 Confluent 平台,则需要自行安装和配置 Zookeeper、Apache Kafka、Kafka Connect。 还需要手动安装和配置 Azure Cosmos DB 连接器。
- 创建 Azure Cosmos DB 帐户,容器 设置指南
- Bash shell
- 下载 Java 11+
- 下载 Maven
安装水槽连接器
如果您使用的是推荐的 Confluent 平台设置,则安装已包含 Azure Cosmos DB Sink 连接器,您可以跳过此步骤。
否则,您可以从最新的发布中下载 JAR 文件,或将此存储库打包以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅以下 说明。 还可以从源代码打包新的 JAR 文件。
# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install
# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar
创建 Kafka 主题并写入数据
如果使用 Confluent 平台,创建 Kafka 主题的最简单方法是使用提供的控制中心 UX。 否则,可以使用以下语法手动创建 Kafka 主题:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
对于此方案,我们将创建一个名为“hotels”的 Kafka 主题,并将非schema 嵌入式 JSON 数据写入主题。 若要在控制中心内创建主题,请参阅 Confluent 指南。
接下来,启动 Kafka 控制台生成者,将一些记录写入“hotels”主题。
# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels
在控制台生成者中,输入:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
输入的三条记录以 JSON 格式发布到“hotels”Kafka 主题。
创建汇聚连接器
在 Kafka Connect 中创建 Azure Cosmos DB 接收器连接器。 以下 JSON 正文定义了汇接器(sink connector)的配置。 请确保替换先决条件中 Azure Cosmos DB 设置指南中保存的值,这些值为azure.cosmos.account.endpoint和azure.cosmos.account.key属性。
有关各个配置属性的详细信息,请参阅汇流属性。
{
"name": "cosmosdb-sink-connector-v2",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector",
"tasks.max": "5",
"topics": "{topic}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"azure.cosmos.account.endpoint":"{endpoint}",
"azure.cosmos.account.key":"{masterKey}",
"azure.cosmos.applicationName": "{applicationName}",
"azure.cosmos.sink.database.name":"{databaseName}",
"azure.cosmos.sink.containers.topicMap":"{topic}#{container}"
}
}
填写所有值后,将 JSON 文件保存在本地某个位置。 可以使用此文件通过 REST API 创建连接器。
使用控制中心创建连接器
通过控制中心网页轻松创建连接器。 按照本 安装指南 从控制中心创建连接器。 请使用DatagenConnector磁贴,而不是CosmosSinkConnector选项。 在配置接收连接器时,请填写与 JSON 文件中所填值一致的值。
或者,在连接器页中,可以使用 “上传连接器配置文件 ”选项上传之前创建的 JSON 文件。
使用 REST API 创建连接器
使用 Kafka Connect REST API 创建接收器连接器:
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
确认写入 Azure Cosmos DB 的数据
登录到 Azure 门户 并导航到 Azure Cosmos DB 帐户。 检查“hotels”主题中的三条记录是否已在帐户中创建。
清理行动
若要从控制中心删除连接器,请导航到创建的接收器连接器,然后选择 “删除 ”图标。
或者,使用 Kafka Connect REST API 删除:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector
若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅 以下步骤。
接收器配置属性
以下设置用于配置 Azure Cosmos DB Kafka 接收器连接器。 这些配置值确定要使用的 Kafka 主题、Azure Cosmos DB 容器写入哪些内容以及序列化数据的格式。 有关具有默认值的示例配置文件,请参阅 此配置。
| 配置属性名称 | 违约 | Description |
|---|---|---|
connector.class |
None | Azure Cosmos DB 源的类名。 应将其设置为 com.azure.cosmos.kafka.connect.CosmosSinkConnector |
azure.cosmos.account.endpoint |
None | Cosmos DB 帐户终结点 URI |
azure.cosmos.account.environment |
Azure |
Cosmos DB 帐户的 Azure 环境: AzureChina。 |
azure.cosmos.account.tenantId |
"" |
Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.auth.type |
MasterKey |
目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal |
azure.cosmos.account.key |
"" |
Cosmos DB 帐户密钥(仅在auth.type是MasterKey时需要) |
azure.cosmos.auth.aad.clientId |
"" |
服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.auth.aad.clientSecret |
"" |
服务主体的客户端密钥/密码。 |
azure.cosmos.mode.gateway |
false |
用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。 |
azure.cosmos.preferredRegionList |
[] |
要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。 |
azure.cosmos.application.name |
"" |
应用程序名称。 它被添加为 userAgent 的后缀。 |
azure.cosmos.throughputControl.enabled |
false |
指示是否启用吞吐量控制的标志。 |
azure.cosmos.throughputControl.account.endpoint |
"" |
Cosmos DB 吞吐量控制账户终结点 URI。 |
azure.cosmos.throughputControl.account.environment |
Azure |
Cosmos DB 帐户的 Azure 环境: AzureChina。 |
azure.cosmos.throughputControl.account.tenantId |
"" |
Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.throughputControl.auth.type |
MasterKey |
目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal |
azure.cosmos.throughputControl.account.key |
"" |
Cosmos DB 吞吐量控制帐户密钥(仅在 throughputControl.auth.type 为 MasterKey 时需要)。 |
azure.cosmos.throughputControl.auth.aad.clientId |
"" |
服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.throughputControl.auth.aad.clientSecret |
"" |
服务主体的客户端密钥/密码。 |
azure.cosmos.throughputControl.preferredRegionList |
[] |
要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,它提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。 |
azure.cosmos.throughputControl.mode.gateway |
false |
用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。 |
azure.cosmos.throughputControl.group.name |
"" |
吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。 |
azure.cosmos.throughputControl.targetThroughput |
-1 |
吞吐量控制组目标吞吐量。 该值应大于 0。 |
azure.cosmos.throughputControl.targetThroughputThreshold |
-1 |
吞吐量控制组目标吞吐量阈值。 该值应介于 (0,1) 之间。 |
azure.cosmos.throughputControl.priorityLevel |
None |
吞吐量控制组优先级。 该值可以是 None、High 或 Low。 |
azure.cosmos.throughputControl.globalControl.database.name |
"" |
要用于吞吐量全局控制的数据库。 |
azure.cosmos.throughputControl.globalControl.container.name |
"" |
要用于吞吐量全局控制的容器。 |
azure.cosmos.throughputControl.globalControl.renewIntervalInMS |
-1 |
这会控制客户端更新自身吞吐量使用情况的频率,并根据其他客户端的吞吐量使用情况调整自己的吞吐量共享。 默认值为 5 秒,允许的最小值为 5 秒。 |
azure.cosmos.throughputControl.globalControl.expireIntervalInMS |
-1 |
这可以控制我们如何快速检测客户端离线的速度,从而允许其他客户端接管其分配的吞吐量。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。 |
azure.cosmos.sink.database.name |
"" |
Cosmos DB 数据库名称。 |
azure.cosmos.sink.containers.topicMap |
"" |
映射到 Cosmos 容器的 Kafka 主题的逗号分隔列表。 例如:topic1#con1、topic2#con2。 |
azure.cosmos.sink.errors.tolerance.level |
None |
耗尽所有重试后的错误容错级别。
None 错误时失败。
All 用于记录日志并继续 |
azure.cosmos.sink.bulk.enabled |
true |
用于指示是否为接收器连接器启用 Cosmos DB 批量模式的标志。 默认情况下为 true。 |
azure.cosmos.sink.bulk.maxConcurrentCosmosPartitions |
-1 |
Cosmos DB 最大并发 Cosmos 分区。 如果未指定,则会基于容器的物理分区数量来确定,这表示每个批处理预计会包含来自所有 Cosmos 物理分区的数据。 如果指定,则指示每个批处理数据的 Cosmos DB 物理分区数。 因此,当重新分区每个批处理中的输入数据以平衡每个批处理需要写入的 Cosmos 分区数时,此配置可用于提高批量处理效率。 这对于大型容器(具有数百个物理分区)非常有用。 |
azure.cosmos.sink.bulk.initialBatchSize |
1 |
Cosmos DB 初始批量微批大小 - 排队的文档数超过此大小时要刷新到后端的微批 - 或满足目标有效负载大小。 正在根据限流速率自动调整微批大小。 默认情况下,初始微批大小为 1。 如果希望避免前几个请求消耗过多 RU,请减少这一点。 |
azure.cosmos.sink.write.strategy |
ItemOverwrite |
Cosmos DB 写入策略: ItemOverwrite (使用 upsert)、 ItemAppend (使用 create,忽略预先存在的项目和冲突)、 ItemDelete (基于数据帧的 ID/pk 进行删除)、 ItemDeleteIfNotModified (如果 etag 未变,在收集 id/pk 后基于数据帧的 id/pk 删除), ItemOverwriteIfNotModified (如果 etag 为空则使用 create,否则更新/替换以满足 etag 前置条件;若文档更新导致前置条件失败则忽略), ItemPatch (根据修补程序配置对所有文档进行部分更新) |
azure.cosmos.sink.maxRetryCount |
10 |
对于 Sink 连接器的写入失败,Cosmos DB 的最大重试尝试次数。 默认情况下,连接器在出现暂时性写入错误时最多重试 10 次。 |
azure.cosmos.sink.id.strategy |
ProvidedInValueStrategy |
一种用于利用 id 填充文档的策略。 有效的策略包括:TemplateStrategy、、FullKeyStrategyKafkaMetadataStrategy、ProvidedInKeyStrategy。 ProvidedInValueStrategy 带有id.strategy 前缀的配置属性将被传递给策略。 例如,使用 id.strategy=TemplateStrategy 时,该属性 id.strategy.template 将传递到模板策略,并用于指定要用于构造模板的 id模板字符串。 |
azure.cosmos.sink.write.patch.operationType.default |
Set |
默认 Cosmos DB 补丁操作类型。 支持的功能包括无、添加、设置、替换、删除、增量。 为 no-op选择无。 |
azure.cosmos.sink.write.patch.property.configs |
"" |
Cosmos DB 修补 json 属性配置。 它可以包含多个与以下模式匹配的定义,这些模式用逗号分隔。 property(jsonProperty).op(operationType) 或 property(jsonProperty).path(patchInCosmosdb).op(operationType) - 第二种模式的区别在于,它还允许您定义不同的 Cosmos DB 路径。 注意:它不支持嵌套 json 属性配置。 |
azure.cosmos.sink.write.patch.filter |
"" |
用于条件修补 |
数据以 JSON 形式写入 Azure Cosmos DB,没有任何架构。
支持的数据类型
Azure Cosmos DB Kafka 接收器连接器将接收器记录转换为支持以下架构类型的 JSON 文档:
| 架构类型 | JSON 数据类型 |
|---|---|
| Array | Array |
| 布尔 | 布尔 |
| Float32 | 编号 |
| Float64 | 编号 |
| Int8 | 编号 |
| Int16 | 编号 |
| Int32 | 编号 |
| Int64 | 编号 |
| 地图 | 对象(JSON) |
| String | String Null |
| 结构 | 对象(JSON) |
Sink 连接器还支持以下 AVRO 逻辑类型:
| 架构类型 | JSON 数据类型 |
|---|---|
| 日期 | 编号 |
| Time | 编号 |
| 时间戳 | 编号 |
注释
Azure Cosmos DB 汇聚器连接器目前不支持字节反序列化。
单消息转换(SMT)
除了接收器连接器设置之外,还可以指定使用单消息转换(SMT)修改流经 Kafka Connect 平台的消息。 有关详细信息,请参阅 Confluent SMT 文档。
使用 InsertUUID SMT
可以使用 InsertUUID SMT 自动添加项 ID。 使用自定义 InsertUUID SMT,可以在将字段写入 Azure Cosmos DB 之前,为每个消息插入 id 一个随机 UUID 值。
警告
仅当消息不包含 id 字段时,才使用此 SMT。 否则,这些id值将被覆盖,你可能最终在数据库中得到重复项。 使用 UUID 作为消息 ID 可以快速简单,但不是 Azure Cosmos DB 中使用的 理想分区键 。
安装 SMT
在使用 InsertUUID SMT 之前,需要在 Confluent Platform 安装程序中安装此转换。 如果使用此存储库中的 Confluent Platform 安装程序,则转换已包含在安装中,你可以跳过此步骤。
或者,可以打包 InsertUUID 源 以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅以下 说明。
# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*.jar
配置 SMT
在你的汇流连接器配置中,添加以下属性来设置id。
"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"
有关使用此 SMT 的详细信息,请参阅 InsertUUID 存储库。
使用 SMT 配置 TTL(生存时间)
使用 InsertField 和 Cast 这两个 SMT,您可以配置在 Azure Cosmos DB 中创建的每个项上的 TTL。 在启用项级别的 TTL 之前,请先在容器上启用 TTL。 有关详细信息,请参阅 生存时间 文档。
在 Sink 连接器配置中,添加以下属性以秒为单位设置 TTL。 在此示例中,TTL 设置为 100 秒。 如果消息中已经包含 TTL 字段,这些 SMT 将覆盖 TTL 的值。
"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"
有关使用这些 SMT 的详细信息,请参阅 InsertField 和 Cast 文档。
排查常见问题
下面是使用 Kafka 接收器连接器时可能会遇到的一些常见问题的解决方案。
使用 JsonConverter 读取非 JSON 数据
如果在 Kafka 中对源主题具有非 JSON 数据,并尝试使用 JsonConverter 读取该数据,则会看到以下异常:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)
以 Avro 或其他格式(如 CSV 字符串)序列化的源主题可能会导致此错误。
解决方案:如果主题数据采用 AVRO 格式,请更改 Kafka Connect 接收器连接器以使用 AvroConverter 如下所示。
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
使用 AvroConverter 读取非 Avro 数据
当你尝试使用 Avro 转换器从非 Avro 格式的主题读取数据时,此方案适用。 其中包括由非 Confluent 架构注册表的 Avro 序列化程序(其具有不同线路格式)的其他 Avro 序列化程序写入的数据。
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解决方案:检查源主题的序列化格式。 然后,可以切换连接器以使用正确的转换器,或者将上游格式切换为 Avro。
在没有预期的架构或负载格式的情况下读取 JSON 消息
Kafka Connect 支持包含有效负载和架构的特殊 JSON 消息结构,如下所示。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
]
},
"payload": {
"userid": 123,
"name": "Sam"
}
}
如果尝试读取不包含此结构中的数据的 JSON 数据,则会出现以下错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
为清楚起见,唯一有效的 schemas.enable=true JSON 结构将架构和有效负载字段作为顶级元素,如上所示。 错误消息指出,如果只有纯 JSON 数据,则应将连接器的配置更改为:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
局限性
- 不支持在 Azure Cosmos DB 中自动创建数据库和容器。 数据库和容器必须已存在,并且必须正确配置它们。
后续步骤
可以通过以下文档详细了解 V4 Java SDK 中的批量作业:
- 对 Azure Cosmos DB 数据执行批量操作
- Kafka Connect 用作 Azure Cosmos DB 的 源连接器