Kafka Connect 是一种工具,用于在 Apache Kafka 和其他系统之间缩放且可靠地流式传输数据。 使用 Kafka Connect 可以定义将大型数据集移入和移出 Kafka 的连接器。 Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。
源和汇连接器语义
源连接器 - 目前,此连接器支持多任务的至少一次传递和单任务的精确一次传递。
汇聚连接器 - 此连接器完全支持精确一次语义。
受支持的数据格式
可以将源和接收器连接器配置为支持以下数据格式:
| Format | Description |
|---|---|
| 纯 JSON | 不带任何附加架构的 JSON 记录结构。 |
| 包含架构的 JSON | 具有显式架构信息的 JSON 记录结构,以确保数据与预期格式匹配。 |
| AVRO | 在 Apache Hadoop 项目中开发的面向行的远程过程调用和数据序列化框架。 它使用 JSON 定义数据类型、协议,并使用压缩的二进制格式序列化数据。 |
键和值设置(包括格式和序列化)可以在 Kafka 中独立配置。 因此,可以分别对键和值使用不同的数据格式。 为了适应不同的数据格式,key.converter和value.converter都有转换器配置。
转换器配置示例
纯 JSON
如果需要在没有架构注册表的情况下使用 JSON 连接数据,请使用 Kafka 支持的 JsonConverter。 以下示例显示了 JsonConverter 添加到配置的键和值属性:
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
包含架构的 JSON
设置属性 key.converter.schemas.enable 并 value.converter.schemas.enable 设置为 true,以便将键或值视为包含内部架构和数据的组合 JSON 对象。 如果没有这些属性,键或值将被视为纯 JSON。
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
指向 Kafka 的结果消息如以下示例所示,其中架构和有效负载为 JSON 中的顶级元素:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": false,
"name": "ksql.users"
},
"payload": {
"userid": 123,
"name": "user's name"
}
}
注释
写入 Azure Cosmos DB 的消息由架构和有效负载组成。 请注意消息的大小,以及由有效负载与架构构成的比例。 每次写入 Kafka 的消息中都会重复该架构。 在这种情况下,你可能想要使用 JSON 架构或 AVRO 等序列化格式,其中架构单独存储,而消息仅保留有效负载。
AVRO
Kafka 连接器支持 AVRO 数据格式。 若要使用 AVRO 格式,请配置一个 AvroConverter 以便 Kafka Connect 知道如何处理 AVRO 数据。 Azure Cosmos DB Kafka Connect 已在 Apache 2.0 许可证下使用 Confluent 提供的 AvroConverter 进行测试。 如果愿意,也可以使用其他自定义转换器。
Kafka 独立处理键和值。 在工作者配置中按要求指定key.converter和value.converter属性。 使用 AvroConverter时,请添加一个额外的转换器属性,该属性提供架构注册表的 URL。 以下示例显示了添加到配置的 AvroConverter 键和值属性:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
选择转换格式
下面是有关如何选择转换格式的一些注意事项:
配置 源连接器时:
如果希望 Kafka Connect 在写入 Kafka 的消息中包含纯 JSON,请设置 纯 JSON 配置。
如果希望 Kafka Connect 在写入 Kafka 的消息中包含架构,请设置 JSON with Schema 配置。
如果希望 Kafka Connect 在写入 Kafka 的消息中包含 AVRO 格式,请设置 AVRO 配置。
如果您将 Kafka 主题中的 JSON 数据消费到 Sink 连接器,请确定 JSON 在写入 Kafka 主题时是如何序列化的:
配置
常见配置属性
源和接收器连接器共享以下常见配置属性:
| Name | 类型 | Description | 必需/可选 |
|---|---|---|---|
| connect.cosmos.connection.endpoint | uri | Azure Cosmos DB 端点 URI 字符串 | 必选 |
| connect.cosmos.master.key | 字符串 | 汇聚器连接到的 Azure Cosmos DB 主密钥。 | 必选 |
| connect.cosmos.databasename | 字符串 | 接收器写入的 Azure Cosmos DB 数据库的名称。 | 必选 |
| connect.cosmos.containers.topicmap | 字符串 | Kafka 主题与 Azure Cosmos DB 容器之间的映射。 它已格式化为 CSV 格式 topic#container,topic2#container2 |
必选 |
| connect.cosmos.connection.gateway.enabled | 布尔 | 用于指示是否使用网关模式的标志。 默认情况下为“false”。 | 可选 |
有关汇聚连接器特定配置,请参阅 汇聚连接器文档
有关特定于源连接器的配置,请参阅 源连接器文档
常见配置错误
如果在 Kafka Connect 中错误配置转换器,则可能会导致错误。 这些错误将显示在 Kafka 连接器接收器上,因为你将尝试反序列化已存储在 Kafka 中的消息。 转换器问题通常不会出现在源中,因为序列化是在源中设置的。
有关详细信息,请参阅 常见配置错误 文档。
项目设置
有关初始设置说明,请参阅 开发人员演练和项目设置 。
性能测试
有关为接收器和源连接器运行的性能测试的详细信息,请参阅 性能测试文档。
有关为连接器部署性能测试环境的具体步骤,请参阅 性能环境设置 。