Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。 Azure Cosmos DB 接收器连接器允许将数据从 Apache Kafka 主题导出到 Azure Cosmos DB 数据库。 连接器根据订阅的主题,从 Kafka 中轮询数据,并将其写入数据库中的容器。
先决条件
从 Confluent 平台设置 开始,因为它为你提供了一个完整的环境来使用。 如果不想使用 Confluent 平台,则需要自行安装和配置 Apache Kafka、Kafka Connect。 还需要手动安装和配置 Azure Cosmos DB 连接器。
创建 Azure Cosmos DB 帐户,容器 设置指南
Bash shell,在 GitHub Codespaces、Mac、Ubuntu、Windows 上使用 WSL2 进行测试。 此 shell 在 WSL1 中不起作用。
下载 Java 11+
下载 Maven
安装水槽连接器
如果使用建议的 Confluent 平台设置,则安装中包含 Azure Cosmos DB 接收器连接器,可以跳过此步骤。
否则,您可以从最新的发布中下载 JAR 文件,或将此存储库打包以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅以下 说明。 还可以从源代码打包新的 JAR 文件。
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar
创建 Kafka 主题并写入数据
如果使用 Confluent 平台,创建 Kafka 主题的最简单方法是使用提供的控制中心 UX。 否则,可以使用以下语法手动创建 Kafka 主题:
./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
对于此方案,我们将创建一个名为“hotels”的 Kafka 主题,并将非架构嵌入的 JSON 数据写入主题。 若要在控制中心内创建主题,请参阅 Confluent 指南。
接下来,启动 Kafka 控制台生成者,将一些记录写入“hotels”主题。
# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels
在控制台生成者中,输入:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
输入的三条记录以 JSON 格式发布到“hotels”Kafka 主题。
创建汇聚连接器
在 Kafka Connect 中创建 Azure Cosmos DB 接收器连接器。 以下 JSON 正文定义了汇接器(sink connector)的配置。 请确保将您从 Azure Cosmos DB 设置指南中保存的 connect.cosmos.connection.endpoint 和 connect.cosmos.master.key 属性的值替换掉,这些是在先决条件部分中应该保存的内容。
有关各个配置属性的详细信息,请参阅汇流属性。
{
"name": "cosmosdb-sink-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"tasks.max": "1",
"topics": [
"hotels"
],
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.cn:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "hotels#kafka"
}
}
填写所有值后,将 JSON 文件保存在本地某个位置。 可以使用此文件通过 REST API 创建连接器。
使用控制中心创建连接器
通过控制中心网页轻松创建连接器。 按照本 安装指南 从控制中心创建连接器。 请使用DatagenConnector磁贴,而不是CosmosDBSinkConnector选项。 配置汇聚连接器时,请填写与 JSON 文件中相同的值。
或者,在连接器页中,可以使用 “上传连接器配置文件 ”选项上传之前创建的 JSON 文件。
使用 REST API 创建连接器
使用 Connect REST API 创建接收器连接器:
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
确认写入 Azure Cosmos DB 的数据
登录到 Azure 门户,导航到 Azure Cosmos DB 帐户。 检查“hotels”主题中的三条记录是否已在帐户中创建。
清理行动
若要从控制中心删除连接器,请导航到创建的接收器连接器,然后选择 “删除” 图标。
或者,使用 Connect REST API 来删除:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector
若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅 以下步骤。
接收器配置属性
以下设置用于配置 Azure Cosmos DB Kafka 接收器连接器。 这些配置值确定从哪些 Kafka 主题消费数据、将数据写入到哪个 Azure Cosmos DB 容器,以及用于序列化数据的格式。 有关具有默认值的示例配置文件,请参阅 此配置。
| Name | 类型 | Description | 必需/可选 |
|---|---|---|---|
| 主题 | 列表 | 要观看的 Kafka 主题列表。 | 必选 |
| connector.class | 字符串 | Azure Cosmos DB 接收器的类名。 它应设置为 com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector。 |
必选 |
| connect.cosmos.connection.endpoint | uri | Azure Cosmos DB 终结点 URI 字符串。 | 必选 |
| connect.cosmos.master.key | 字符串 | 汇聚器连接到的 Azure Cosmos DB 主密钥。 | 必选 |
| connect.cosmos.databasename | 字符串 | 接收器写入的 Azure Cosmos DB 数据库的名称。 | 必选 |
| connect.cosmos.containers.topicmap | 字符串 | Kafka 主题与 Azure Cosmos DB 容器之间的映射,使用 CSV 格式,如下所示:topic#container,topic2#container2 |
必选 |
| connect.cosmos.connection.gateway.enabled | 布尔 | 用于指示是否使用网关模式的标志。 默认情况下为“false”。 | 可选 |
| connect.cosmos.sink.bulk.enabled | 布尔 | 指示是否启用批量模式的标志。 默认情况下为 true。 | 可选 |
| connect.cosmos.sink.maxRetryCount | int | 暂时性写入失败时的最大重试尝试次数。 默认情况下为 10 次。 | 可选 |
| 密钥转换器 | 字符串 | 写入 Kafka 主题的密钥数据的序列化格式。 | 必选 |
| 数值转换器 | 字符串 | 写入 Kafka 主题的值数据的序列化格式。 | 必选 |
| key.converter.schemas.enable | 字符串 | 如果键数据具有嵌入架构,则设置为“true”。 | 可选 |
| value.converter.schemas.enable | 字符串 | 如果键数据具有嵌入架构,则设置为“true”。 | 可选 |
| tasks.max | int | 连接器接收器任务的最大数目。 默认值为 1 |
可选 |
数据将始终以 JSON 形式写入 Azure Cosmos DB,而无需任何架构。
支持的数据类型
Azure Cosmos DB 接收器连接器将接收器记录转换为支持以下架构类型的 JSON 文档:
| 架构类型 | JSON 数据类型 |
|---|---|
| Array | Array |
| 布尔 | 布尔 |
| Float32 | 编号 |
| Float64 | 编号 |
| Int8 | 编号 |
| Int16 | 编号 |
| Int32 | 编号 |
| Int64 | 编号 |
| 地图 | 对象(JSON) |
| String | String Null |
| 结构 | 对象(JSON) |
Sink 连接器还支持以下 AVRO 逻辑类型:
| 架构类型 | JSON 数据类型 |
|---|---|
| 日期 | 编号 |
| Time | 编号 |
| 时间戳 | 编号 |
注释
Azure Cosmos DB 接收器连接器当前不支持字节反序列化。
单消息转换(SMT)
除了接收器连接器设置之外,还可以指定使用单消息转换(SMT)修改流经 Kafka Connect 平台的消息。 有关详细信息,请参阅 Confluent SMT 文档。
使用 InsertUUID SMT
可以使用 InsertUUID SMT 自动添加项 ID。 使用自定义 InsertUUID SMT,可以在将字段写入 Azure Cosmos DB 之前,为每个消息插入 id 一个随机 UUID 值。
警告
仅当消息不包含 id 字段时,才使用此 SMT。 否则,这些id值将被覆盖,你可能最终在数据库中得到重复项。 使用 UUID 作为消息 ID 可以快速简单,但不是 Azure Cosmos DB 中使用的 理想分区键 。
安装 SMT
在使用 InsertUUID SMT 之前,需要在 Confluent Platform 安装程序中安装此转换。 如果使用此存储库中的 Confluent Platform 安装程序,则转换已包含在安装中,你可以跳过此步骤。
或者,可以打包 InsertUUID 源 以创建新的 JAR 文件。 若要使用 JAR 文件手动安装连接器,请参阅以下 说明。
# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*.jar
配置 SMT
在你的汇流连接器配置中,添加以下属性来设置id。
"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"
有关使用此 SMT 的详细信息,请参阅 InsertUUID 存储库。
使用 SMT 配置 TTL(生存时间)
使用 InsertField 和 Cast 这两个 SMT,您可以配置在 Azure Cosmos DB 中创建的每个项上的 TTL。 在启用项级别的 TTL 之前,请先在容器上启用 TTL。 有关详细信息,请参阅 生存时间 文档。
在 Sink 连接器配置中,添加以下属性以秒为单位设置 TTL。 在此示例中,TTL 设置为 100 秒。 如果消息已包含TTL字段,并且TTL值将被这些SMT覆盖。
"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"
有关使用这些 SMT 的详细信息,请参阅 InsertField 和 Cast 文档。
排查常见问题
下面是使用 Kafka 接收器连接器时可能会遇到的一些常见问题的解决方案。
使用 JsonConverter 读取非 JSON 数据
如果在 Kafka 中对源主题具有非 JSON 数据,并尝试使用 JsonConverter 读取该数据,你将看到以下异常:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7
此错误可能是由源主题中的数据以 Avro 或其他格式(如 CSV 字符串)序列化引起的。
解决方案:如果主题数据采用 AVRO 格式,请更改 Kafka Connect 接收器连接器以使用 AvroConverter 如下所示。
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
网关模式支持
connect.cosmos.connection.gateway.enabled 是 Cosmos DB Kafka Sink Connector 的一个配置选项,它通过利用 Cosmos DB 网关服务来增强数据摄取。 此服务充当 Cosmos DB 的前端,提供负载均衡、请求路由和协议转换等优势。 通过利用网关服务,连接器在将数据写入 Cosmos DB 时实现了改进的吞吐量和可伸缩性。 有关详细信息,请参阅 连接模式。
"connect.cosmos.connection.gateway.enabled": true
批量模式支持
connect.cosmos.sink.bulk.enabled 属性确定是否启用了大容量写入功能,以便将数据从 Kafka 主题写入 Azure Cosmos DB。
当此属性设置为 true (默认情况下),它启用大容量写入模式,允许 Kafka Connect 使用 Azure Cosmos DB 的批量导入 API 来执行有效的批处理写入 CosmosContainer.executeBulkOperations() 方法。 批量写入模式在使用 CosmosContainer.upsertItem() 方法将数据引入 Cosmos DB 时,与非批量模式相比,可显著提高写入性能并降低总体延迟。
默认情况下启用批量模式。 若要禁用该 connect.cosmos.sink.bulk.enabled 属性,需要在 Cosmos DB 接收器连接器的配置中将其 false 设置为该属性。 下面是一个示例配置属性文件:
"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.cn:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false
通过启用该 connect.cosmos.sink.bulk.enabled 属性,可以利用适用于 Azure Cosmos DB 的 Kafka Connect 中的批量写入功能,在将数据从 Kafka 主题复制到 Azure Cosmos DB 时实现改进的写入性能。
"connect.cosmos.sink.bulk.enabled": true
使用 AvroConverter 读取非 Avro 数据
当你尝试使用 Avro 转换器从非 Avro 格式的主题读取数据时,此方案适用。 其中包括由非 Confluent 架构注册表的 Avro 序列化程序(其具有不同线路格式)的其他 Avro 序列化程序写入的数据。
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解决方案:检查源主题的序列化格式。 然后,切换 Kafka Connect 的接收器连接器以使用正确的转换器或将上游格式切换到 Avro。
在没有预期的架构或负载格式的情况下读取 JSON 消息
Kafka Connect 支持包含有效负载和架构的特殊 JSON 消息结构,如下所示。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
]
},
"payload": {
"userid": 123,
"name": "Sam"
}
}
如果尝试读取不包含此结构中的数据的 JSON 数据,将收到以下错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
为清楚起见,唯一有效的 schemas.enable=true JSON 结构将架构和有效负载字段作为顶级元素,如上所示。 错误消息指出,如果只有纯 JSON 数据,则应将连接器的配置更改为:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
局限性
- 不支持在 Azure Cosmos DB 中自动创建数据库和容器。 数据库和容器必须已存在,并且必须正确配置它们。
后续步骤
可以通过以下文档更详细地了解 Azure Cosmos DB 中的更改馈送:
可以通过以下文档详细了解 V4 Java SDK 中的批量作业: