迁移指南:Azure Cosmos DB Kafka 连接器 V1 → V2

本指南可帮助用户从 V1 升级到 V2 Azure Cosmos DB Kafka 连接器(源和接收器)。 V2 引入了重大的中断性变更、体系结构改进和配置更新

🔄 关键体系结构差异

了解 Kafka V1 和 Kafka V2 连接器之间的差异非常重要。 虽然接收器连接器在性能和实现细节方面几乎没有区别,但 V2 源连接器使用更改源拉取模型框架。 这允许 V2 源连接器在数据库下处理多个容器,而 V1 源连接器只能处理每个连接器实例的单个容器。 与 V1 源连接器相比,这种差异使 V2 源连接器在内存和吞吐量方面更高效。 V2连接器在多区域缩放优化方面优于V1连接器。

功能 / 特点 V1 连接器(旧版) V2 连接器 (新式)
更改馈送模式 更改源处理器(租赁容器) 更改源拉取模型(Kafka 偏移主题)
偏移存储 Cosmos DB 租赁容器 Kafka 内部偏移主题
传递语义(源) 至少一次 恰好一次
传递语义(接收器) 至少一次 恰好一次
并行度 Cosmos SDK 分区 Kafka Connect 任务/线程模型
SDK 版本 旧版 SDK Azure Cosmos Java SDK V4
状态/检查点兼容性 由 Cosmos 管理(按容器) 由 Kafka 管理(按主题)
配置样式 特定于 Cosmos,基于租赁 Kafka 本机,声明性
身份验证机制 仅基于密钥的身份验证支持 基于密钥的 + Entra ID 身份验证支持
吞吐量控制支持 不支持 支持吞吐量控件组

⚙️ 配置比较 (V1 与 V2)

🔹 连接配置

V1 配置 V2 配置 注释
connect.cosmos.master.key azure.cosmos.account.key 为了清楚起见,已重命名
connect.cosmos.host azure.cosmos.account.endpoint 已重命名以保持一致性

V2 连接器中新添加的连接配置

配置名称 注释
azure.cosmos.account.tenantId 对于服务主体身份验证为必需
azure.cosmos.auth.aad.clientSecret 对于服务主体身份验证为必需
azure.cosmos.auth.aad.clientId 服务主体的客户端标识/应用标识
azure.cosmos.auth.aad.clientSecret 服务主体的客户端密码/密码

🔹 源连接器配置

V1 配置 V2 配置 注释
connect.cosmos.source.container azure.cosmos.container.name 统一命名
connect.cosmos.database.name azure.cosmos.database.name 不变
connect.cosmos.source.database 移除 使用 cosmos.database.name
connect.cosmos.source.lease.container 移除 V2 中未使用的租约
connect.cosmos.source.lease.prefix 移除 已删除租约管理
connect.cosmos.source.start.from.latest azure.cosmos.source.start.from 使用 BeginningNow
connect.cosmos.source.task.count tasks.max 标准 Kafka Connect 配置

可以在 Kafka 连接器 V2 源连接器文档中找到其他配置属性


🔹 Spark 连接器配置

V1 配置 V2 配置 注释
connect.cosmos.sink.database.name azure.cosmos.database.name 统一
connect.cosmos.sink.container.name azure.cosmos.container.name 统一
connect.cosmos.sink.upsert.enabled azure.cosmos.sink.upsert.enabled 已保留
connect.cosmos.sink.id.strategy azure.cosmos.sink.id.strategy 已保留

有关其他配置属性,请参阅 Kafka 连接器 V2 接收器连接器文档


🧪 可观测性和调试

V1 配置 V2 配置 注释
代码中的自定义日志功能 标准 SLF4J 日志记录 使用 Kafka Connect 日志
租赁容器检查 Kafka 偏移主题检查 与 Kafka 工具兼容

⚠️ 中断性变更

  • 已删除租约容器:元数据不再存储在 Cosmos 容器中。
  • 开始位置:V2 必须使用 cosmos.source.start.from 从开始时间或当前时间重启。
  • 偏移管理:现在由 Kafka 在内部处理 — 无法从租约容器转移。
  • 线程模型:V2 使用 Kafka 的任务线程模型。 请调整 tasks.max,而非 Cosmos 特定的设置。

✅ 迁移步骤

  1. 停止 V1 连接器

    • 使用 Kafka Connect 的 REST API 正常停止正在运行的 V1 连接器。
    • 如果需要,请备份租约容器中的任何数据。
  2. 部署 V2 连接器

    • 将 V2 连接器 JAR 放入 Kafka Connect 插件路径。
    • 删除旧的 V1 连接器 JAR 以避免冲突。
  3. 创建新配置

    • 示例源配置(V2):

      {
        "name": "cosmos-source",
        "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosSourceConnector",
        "tasks.max": "1",
        "cosmos.account.endpoint": "<endpoint>",
        "cosmos.account.key": "<key>",
        "cosmos.database.name": "<database>",
        "cosmos.container.name": "<container>",
        "topic": "<kafka-topic>",
        "cosmos.source.start.from": "Beginning"
      }
      
    • 示例接收器配置 (V2):

      {
        "name": "cosmos-sink",
        "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosSinkConnector",
        "tasks.max": "1",
        "cosmos.account.endpoint": "<endpoint>",
        "cosmos.account.key": "<key>",
        "cosmos.database.name": "<database>",
        "cosmos.container.name": "<container>",
        "topics": "<kafka-topic>",
        "cosmos.sink.upsert.enabled": true
      }
      
  4. 启动 V2 连接器

    • 使用 Kafka Connect REST API 提交新配置。
    • 监视日志和主题数据流。
  5. 验证输出

    • 通过指标和 Cosmos DB Insights 确认文档引入或读取进度。
    • 验证内部 Kafka 主题中的偏移提交。

📌 其他提示

  • 在生产环境中运行 V2 之前在暂存环境中进行测试
  • 如果确切的交付保证至关重要,请从新的 Kafka 主题开始以避免重复。
  • 当对 V2 有信心时,清理旧租赁容器。

📚 引用