本指南可帮助用户从 V1 升级到 V2 Azure Cosmos DB Kafka 连接器(源和接收器)。 V2 引入了重大 中断性变更、体系结构改进和配置更新。
🔄 关键体系结构差异
了解 Kafka V1 和 Kafka V2 连接器之间的差异非常重要。 接收器连接器在性能和实现细节方面几乎没有区别,而 V2 源连接器则采用 变更馈送拉取模型框架。 这允许 V2 源连接器在数据库下处理多个容器,而 V1 源连接器只能处理每个连接器实例的单个容器。 与 V1 源连接器相比,这种差异使 V2 源连接器在内存和吞吐量方面更高效。 V2 连接器比 V1 连接器在多区域具有更好的规模优化。
| 功能 / 特点 | V1 连接器(旧版) | V2 连接器 (新式) |
|---|---|---|
| 变更数据流模式 | 更改源处理器 (租约容器) | 更改源拉取模型 (Kafka 偏移主题) |
| 偏移存储 | Cosmos DB 租约容器 | Kafka 内部位移主题 |
| 交付语义(来源) | 至少一次 | 恰好一次 |
| 传递语义(数据接收端) | 至少一次 | 恰好一次 |
| 并行度 | Cosmos SDK 分区 | Kafka Connect 任务/线程模型 |
| SDK 版本 | 旧版 SDK | Azure Cosmos Java SDK V4 |
| 状态/检查点兼容性 | Cosmos 托管(容器中) | Kafka 托管(主题) |
| 配置样式 | 特定于 Cosmos 的基于租约 | Kafka 原生, 声明式 |
| 身份验证机制 | 仅基于密钥的身份验证支持 | 基于密钥的 + Entra ID 身份验证支持 |
| 吞吐量控制支持 | 不支持 | 支持吞吐量控制组 |
⚙️ 配置比较 (V1 与 V2)
🔹 连接配置
| V1 配置 | V2 配置 | 注释 |
|---|---|---|
connect.cosmos.master.key |
azure.cosmos.account.key |
为了清楚起见,已重命名 |
connect.cosmos.host |
azure.cosmos.account.endpoint |
为保持一致性而重命名 |
V2 连接器中新添加的连接配置
| 配置名称 | 注释 |
|---|---|
azure.cosmos.account.tenantId |
用于服务主体身份验证所必需的 |
azure.cosmos.auth.aad.clientSecret |
用于服务主体身份验证所必需的 |
azure.cosmos.auth.aad.clientId |
服务主体的 ClientId/ApplicationId |
azure.cosmos.auth.aad.clientSecret |
服务主体的客户端机密/密码 |
🔹 源连接器配置
| V1 配置 | V2 配置 | 注释 |
|---|---|---|
connect.cosmos.source.container |
azure.cosmos.container.name |
统一命名 |
connect.cosmos.database.name |
azure.cosmos.database.name |
不变 |
connect.cosmos.source.database |
已删除 | 使用 cosmos.database.name |
connect.cosmos.source.lease.container |
已删除 | V2 中未使用的租约 |
connect.cosmos.source.lease.prefix |
已删除 | 已删除租约管理 |
connect.cosmos.source.start.from.latest |
azure.cosmos.source.start.from |
使用 Beginning 或 Now |
connect.cosmos.source.task.count |
tasks.max |
标准 Kafka Connect 配置 |
可以在 Kafka 连接器 V2 源连接器文档中找到其他配置属性
🔹 Sink 连接器配置
| V1 配置 | V2 配置 | 注释 |
|---|---|---|
connect.cosmos.sink.database.name |
azure.cosmos.database.name |
Unified |
connect.cosmos.sink.container.name |
azure.cosmos.container.name |
Unified |
connect.cosmos.sink.upsert.enabled |
azure.cosmos.sink.upsert.enabled |
已保留 |
connect.cosmos.sink.id.strategy |
azure.cosmos.sink.id.strategy |
已保留 |
可以在 Kafka Connector V2 下沉连接器文档中找到更多配置属性。
🧪 可观测性和调试
| V1 配置 | V2 配置 | 注释 |
|---|---|---|
| 代码中的自定义日志记录 | 标准 SLF4J 日志记录 | 使用 Kafka Connect 日志 |
| 租赁容器检查 | Kafka 偏移主题检查 | 与 Kafka 工具兼容 |
⚠️ 中断性变更
- 已删除租约容器:元数据不再存储在 Cosmos 容器中。
-
开始位置:V2 必须从开始或当前时间重启,使用
cosmos.source.start.from。 - 偏移管理:现在由 Kafka 在内部处理 — 无法从租约容器转移。
-
线程模型:V2 使用 Kafka 的任务线程模型。 调整
tasks.max设置,而不是特定于 Cosmos 的设置。
✅ 迁移步骤
停止 V1 连接器
- 使用 Kafka Connect 的 REST API 正常停止正在运行的 V1 连接器。
- 如有需要,请从租约容器中备份所需的任何数据。
部署 V2 连接器
- 将 V2 连接器 JAR 放入 Kafka Connect 插件路径。
- 删除旧的 V1 连接器 JAR 以避免冲突。
创建新配置
示例源配置(V2):
{ "name": "cosmos-source", "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosSourceConnector", "tasks.max": "1", "cosmos.account.endpoint": "<endpoint>", "cosmos.account.key": "<key>", "cosmos.database.name": "<database>", "cosmos.container.name": "<container>", "topic": "<kafka-topic>", "cosmos.source.start.from": "Beginning" }Sink配置示例(V2):
{ "name": "cosmos-sink", "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosSinkConnector", "tasks.max": "1", "cosmos.account.endpoint": "<endpoint>", "cosmos.account.key": "<key>", "cosmos.database.name": "<database>", "cosmos.container.name": "<container>", "topics": "<kafka-topic>", "cosmos.sink.upsert.enabled": true }
启动 V2 连接器
- 使用 Kafka Connect REST API 提交新配置。
- 监视日志和主题数据流。
验证输出
- 通过指标和 Cosmos DB Insights 确认文档引入或读取进度。
- 验证内部 Kafka 主题中的偏移量提交。
📌 其他提示
- 在生产环境中运行 V2 之前,在预发布环境中进行测试。
- 如果确切的交付保证至关重要,请从新的 Kafka 主题开始以避免重复。
- 一旦对 V2 有信心,清理不再需要的旧租约容器。