迁移指南:Azure Cosmos DB Kafka 连接器 V1 → V2

本指南可帮助用户从 V1 升级到 V2 Azure Cosmos DB Kafka 连接器(源和接收器)。 V2 引入了重大 中断性变更、体系结构改进和配置更新。

🔄 关键体系结构差异

了解 Kafka V1 和 Kafka V2 连接器之间的差异非常重要。 接收器连接器在性能和实现细节方面几乎没有区别,而 V2 源连接器则采用 变更馈送拉取模型框架。 这允许 V2 源连接器在数据库下处理多个容器,而 V1 源连接器只能处理每个连接器实例的单个容器。 与 V1 源连接器相比,这种差异使 V2 源连接器在内存和吞吐量方面更高效。 V2 连接器比 V1 连接器在多区域具有更好的规模优化。

功能 / 特点 V1 连接器(旧版) V2 连接器 (新式)
变更数据流模式 更改源处理器 (租约容器) 更改源拉取模型 (Kafka 偏移主题)
偏移存储 Cosmos DB 租约容器 Kafka 内部位移主题
交付语义(来源) 至少一次 恰好一次
传递语义(数据接收端) 至少一次 恰好一次
并行度 Cosmos SDK 分区 Kafka Connect 任务/线程模型
SDK 版本 旧版 SDK Azure Cosmos Java SDK V4
状态/检查点兼容性 Cosmos 托管(容器中) Kafka 托管(主题)
配置样式 特定于 Cosmos 的基于租约 Kafka 原生, 声明式
身份验证机制 仅基于密钥的身份验证支持 基于密钥的 + Entra ID 身份验证支持
吞吐量控制支持 不支持 支持吞吐量控制组

⚙️ 配置比较 (V1 与 V2)

🔹 连接配置

V1 配置 V2 配置 注释
connect.cosmos.master.key azure.cosmos.account.key 为了清楚起见,已重命名
connect.cosmos.host azure.cosmos.account.endpoint 为保持一致性而重命名

V2 连接器中新添加的连接配置

配置名称 注释
azure.cosmos.account.tenantId 用于服务主体身份验证所必需的
azure.cosmos.auth.aad.clientSecret 用于服务主体身份验证所必需的
azure.cosmos.auth.aad.clientId 服务主体的 ClientId/ApplicationId
azure.cosmos.auth.aad.clientSecret 服务主体的客户端机密/密码

🔹 源连接器配置

V1 配置 V2 配置 注释
connect.cosmos.source.container azure.cosmos.container.name 统一命名
connect.cosmos.database.name azure.cosmos.database.name 不变
connect.cosmos.source.database 已删除 使用 cosmos.database.name
connect.cosmos.source.lease.container 已删除 V2 中未使用的租约
connect.cosmos.source.lease.prefix 已删除 已删除租约管理
connect.cosmos.source.start.from.latest azure.cosmos.source.start.from 使用 BeginningNow
connect.cosmos.source.task.count tasks.max 标准 Kafka Connect 配置

可以在 Kafka 连接器 V2 源连接器文档中找到其他配置属性


🔹 Sink 连接器配置

V1 配置 V2 配置 注释
connect.cosmos.sink.database.name azure.cosmos.database.name Unified
connect.cosmos.sink.container.name azure.cosmos.container.name Unified
connect.cosmos.sink.upsert.enabled azure.cosmos.sink.upsert.enabled 已保留
connect.cosmos.sink.id.strategy azure.cosmos.sink.id.strategy 已保留

可以在 Kafka Connector V2 下沉连接器文档中找到更多配置属性。


🧪 可观测性和调试

V1 配置 V2 配置 注释
代码中的自定义日志记录 标准 SLF4J 日志记录 使用 Kafka Connect 日志
租赁容器检查 Kafka 偏移主题检查 与 Kafka 工具兼容

⚠️ 中断性变更

  • 已删除租约容器:元数据不再存储在 Cosmos 容器中。
  • 开始位置:V2 必须从开始或当前时间重启,使用 cosmos.source.start.from
  • 偏移管理:现在由 Kafka 在内部处理 — 无法从租约容器转移。
  • 线程模型:V2 使用 Kafka 的任务线程模型。 调整 tasks.max 设置,而不是特定于 Cosmos 的设置。

✅ 迁移步骤

  1. 停止 V1 连接器

    • 使用 Kafka Connect 的 REST API 正常停止正在运行的 V1 连接器。
    • 如有需要,请从租约容器中备份所需的任何数据。
  2. 部署 V2 连接器

    • 将 V2 连接器 JAR 放入 Kafka Connect 插件路径。
    • 删除旧的 V1 连接器 JAR 以避免冲突。
  3. 创建新配置

    • 示例源配置(V2):

      {
        "name": "cosmos-source",
        "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosSourceConnector",
        "tasks.max": "1",
        "cosmos.account.endpoint": "<endpoint>",
        "cosmos.account.key": "<key>",
        "cosmos.database.name": "<database>",
        "cosmos.container.name": "<container>",
        "topic": "<kafka-topic>",
        "cosmos.source.start.from": "Beginning"
      }
      
    • Sink配置示例(V2):

      {
        "name": "cosmos-sink",
        "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosSinkConnector",
        "tasks.max": "1",
        "cosmos.account.endpoint": "<endpoint>",
        "cosmos.account.key": "<key>",
        "cosmos.database.name": "<database>",
        "cosmos.container.name": "<container>",
        "topics": "<kafka-topic>",
        "cosmos.sink.upsert.enabled": true
      }
      
  4. 启动 V2 连接器

    • 使用 Kafka Connect REST API 提交新配置。
    • 监视日志和主题数据流。
  5. 验证输出

    • 通过指标和 Cosmos DB Insights 确认文档引入或读取进度。
    • 验证内部 Kafka 主题中的偏移量提交。

📌 其他提示

  • 在生产环境中运行 V2 之前,在预发布环境中进行测试
  • 如果确切的交付保证至关重要,请从新的 Kafka 主题开始以避免重复。
  • 一旦对 V2 有信心,清理不再需要的旧租约容器。

📚 引用