本指南可帮助用户从 V1 升级到 V2 Azure Cosmos DB Kafka 连接器(源和接收器)。 V2 引入了重大的中断性变更、体系结构改进和配置更新。
🔄 关键体系结构差异
了解 Kafka V1 和 Kafka V2 连接器之间的差异非常重要。 虽然接收器连接器在性能和实现细节方面几乎没有区别,但 V2 源连接器使用更改源拉取模型框架。 这允许 V2 源连接器在数据库下处理多个容器,而 V1 源连接器只能处理每个连接器实例的单个容器。 与 V1 源连接器相比,这种差异使 V2 源连接器在内存和吞吐量方面更高效。 V2连接器在多区域缩放优化方面优于V1连接器。
功能 / 特点 | V1 连接器(旧版) | V2 连接器 (新式) |
---|---|---|
更改馈送模式 | 更改源处理器(租赁容器) | 更改源拉取模型(Kafka 偏移主题) |
偏移存储 | Cosmos DB 租赁容器 | Kafka 内部偏移主题 |
传递语义(源) | 至少一次 | 恰好一次 |
传递语义(接收器) | 至少一次 | 恰好一次 |
并行度 | Cosmos SDK 分区 | Kafka Connect 任务/线程模型 |
SDK 版本 | 旧版 SDK | Azure Cosmos Java SDK V4 |
状态/检查点兼容性 | 由 Cosmos 管理(按容器) | 由 Kafka 管理(按主题) |
配置样式 | 特定于 Cosmos,基于租赁 | Kafka 本机,声明性 |
身份验证机制 | 仅基于密钥的身份验证支持 | 基于密钥的 + Entra ID 身份验证支持 |
吞吐量控制支持 | 不支持 | 支持吞吐量控件组 |
⚙️ 配置比较 (V1 与 V2)
🔹 连接配置
V1 配置 | V2 配置 | 注释 |
---|---|---|
connect.cosmos.master.key |
azure.cosmos.account.key |
为了清楚起见,已重命名 |
connect.cosmos.host |
azure.cosmos.account.endpoint |
已重命名以保持一致性 |
V2 连接器中新添加的连接配置
配置名称 | 注释 |
---|---|
azure.cosmos.account.tenantId |
对于服务主体身份验证为必需 |
azure.cosmos.auth.aad.clientSecret |
对于服务主体身份验证为必需 |
azure.cosmos.auth.aad.clientId |
服务主体的客户端标识/应用标识 |
azure.cosmos.auth.aad.clientSecret |
服务主体的客户端密码/密码 |
🔹 源连接器配置
V1 配置 | V2 配置 | 注释 |
---|---|---|
connect.cosmos.source.container |
azure.cosmos.container.name |
统一命名 |
connect.cosmos.database.name |
azure.cosmos.database.name |
不变 |
connect.cosmos.source.database |
移除 | 使用 cosmos.database.name |
connect.cosmos.source.lease.container |
移除 | V2 中未使用的租约 |
connect.cosmos.source.lease.prefix |
移除 | 已删除租约管理 |
connect.cosmos.source.start.from.latest |
azure.cosmos.source.start.from |
使用 Beginning 或 Now |
connect.cosmos.source.task.count |
tasks.max |
标准 Kafka Connect 配置 |
可以在 Kafka 连接器 V2 源连接器文档中找到其他配置属性
🔹 Spark 连接器配置
V1 配置 | V2 配置 | 注释 |
---|---|---|
connect.cosmos.sink.database.name |
azure.cosmos.database.name |
统一 |
connect.cosmos.sink.container.name |
azure.cosmos.container.name |
统一 |
connect.cosmos.sink.upsert.enabled |
azure.cosmos.sink.upsert.enabled |
已保留 |
connect.cosmos.sink.id.strategy |
azure.cosmos.sink.id.strategy |
已保留 |
有关其他配置属性,请参阅 Kafka 连接器 V2 接收器连接器文档
🧪 可观测性和调试
V1 配置 | V2 配置 | 注释 |
---|---|---|
代码中的自定义日志功能 | 标准 SLF4J 日志记录 | 使用 Kafka Connect 日志 |
租赁容器检查 | Kafka 偏移主题检查 | 与 Kafka 工具兼容 |
⚠️ 中断性变更
- 已删除租约容器:元数据不再存储在 Cosmos 容器中。
-
开始位置:V2 必须使用
cosmos.source.start.from
从开始时间或当前时间重启。 - 偏移管理:现在由 Kafka 在内部处理 — 无法从租约容器转移。
-
线程模型:V2 使用 Kafka 的任务线程模型。 请调整
tasks.max
,而非 Cosmos 特定的设置。
✅ 迁移步骤
停止 V1 连接器
- 使用 Kafka Connect 的 REST API 正常停止正在运行的 V1 连接器。
- 如果需要,请备份租约容器中的任何数据。
部署 V2 连接器
- 将 V2 连接器 JAR 放入 Kafka Connect 插件路径。
- 删除旧的 V1 连接器 JAR 以避免冲突。
创建新配置
示例源配置(V2):
{ "name": "cosmos-source", "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosSourceConnector", "tasks.max": "1", "cosmos.account.endpoint": "<endpoint>", "cosmos.account.key": "<key>", "cosmos.database.name": "<database>", "cosmos.container.name": "<container>", "topic": "<kafka-topic>", "cosmos.source.start.from": "Beginning" }
示例接收器配置 (V2):
{ "name": "cosmos-sink", "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosSinkConnector", "tasks.max": "1", "cosmos.account.endpoint": "<endpoint>", "cosmos.account.key": "<key>", "cosmos.database.name": "<database>", "cosmos.container.name": "<container>", "topics": "<kafka-topic>", "cosmos.sink.upsert.enabled": true }
启动 V2 连接器
- 使用 Kafka Connect REST API 提交新配置。
- 监视日志和主题数据流。
验证输出
- 通过指标和 Cosmos DB Insights 确认文档引入或读取进度。
- 验证内部 Kafka 主题中的偏移提交。
📌 其他提示
- 在生产环境中运行 V2 之前在暂存环境中进行测试。
- 如果确切的交付保证至关重要,请从新的 Kafka 主题开始以避免重复。
- 当对 V2 有信心时,清理旧租赁容器。