Kafka Connect for Azure Cosmos DB - 源连接器 v2

适用范围: NoSQL

Kafka Connect for Azure Cosmos DB 是用于从/向 Azure Cosmos DB 读取/写入数据的连接器。 Azure Cosmos DB 源连接器提供从 Azure Cosmos DB 更改源读取数据并将这些数据发布到 Kafka 主题的功能。

先决条件

  • 请从 Confluent 平台设置开始,因为完成此过程可以提供一个可用的完整环境。 如果你不想要使用 Confluent 平台,则需要自行安装并配置 Zookeeper、Apache Kafka 和 Kafka Connect。 还需要手动安装并配置 Azure Cosmos DB 连接器。
  • 按照容器设置指南创建一个 Azure Cosmos DB 帐户
  • Bash shell
  • 下载 Java 11+
  • 下载 Maven

安装源连接器

如果使用的是建议的 Confluent 平台设置,则 Azure Cosmos DB 源连接器已包含在安装中,因此可以跳过此步骤。

否则,可以使用最新版本中的 JAR 文件,并手动安装连接器。 有关详细信息,请参阅这些说明。 还可以从源代码打包新的 JAR 文件:

# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos

mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install

# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar

创建 Kafka 主题

使用 Confluent 控制中心创建 Kafka 主题。 对于此方案,我们将创建一个名为“apparels”的 Kafka 主题,并将非架构嵌入式 JSON 数据写入该主题。 若要在控制中心内创建主题,请参阅创建 Kafka 主题文档

创建源连接器

在 Kafka Connect 中创建源连接器

若要在 Kafka Connect 中创建 Azure Cosmos DB 源连接器,请使用以下 JSON 配置。确保替换 azure.cosmos.account.endpointazure.cosmos.account.key 属性的占位符值(在学习先决条件部分所述的 Azure Cosmos DB 设置指南时已保存)。

{
  "name": "cosmosdb-source-connector-v2",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector",
    "tasks.max": "5",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "azure.cosmos.account.endpoint":"{endpoint}",
    "azure.cosmos.account.key":"{masterKey}",
    "azure.cosmos.application.name": "{applicationName}",
    "azure.cosmos.source.database.name":"{database}",
    "azure.cosmos.source.containers.includedList":"{container}",
    "azure.cosmos.source.changeFeed.maxItemCountHint":"500",
    "azure.cosmos.source.containers.topicMap":"{topic}#{container}",
    "azure.cosmos.source.metadata.storage.type":"Cosmos",
    "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}"
  }
}

有关上述每个配置属性的详细信息,请参阅源属性部分。 填写所有值后,将 JSON 文件保存在本地的某个位置。 可以使用此文件通过 REST API 创建连接器。

使用控制中心创建连接器

创建连接器的简单方法是使用 Confluent 控制中心门户。 请按照 Confluent 设置指南从控制中心创建连接器。 设置时,请不要使用 DatagenConnector 选项,而应改用 CosmosDBSourceConnector 磁贴。 配置源连接器时,请填写在 JSON 文件中填充的值。

或者,也可以在“连接器”页中,使用“上传连接器配置文件”选项上传在前一部分生成的 JSON 文件。

“浏览连接器”对话框中“上传连接器配置文件”选项的屏幕截图。

使用 REST API 创建连接器

使用 Kafka“连接”REST API 创建源连接器

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

将文档插入 Azure Cosmos DB

  1. 登录到 Azure 门户并导航到你的 Azure Cosmos DB 帐户。

  2. 打开“数据资源管理器”选项卡并选择“数据库”

  3. 打开前面创建的“kafkaconnect”数据库和“kafka”容器。

  4. 若要创建新的 JSON 文档,请在“API for NoSQL”窗格中展开“kafka”容器,选择“项”,然后在工具栏中选择“新建项”。

  5. 现在,将文档添加到采用以下结构的容器。 将以下示例 JSON 块粘贴到“项”选项卡中,并覆盖当前内容:

    
    {
      "id": "2",
      "productId": "33218897",
      "category": "Women's Outerwear",
      "manufacturer": "Contoso",
      "description": "Black wool pea-coat",
      "price": "49.99",
      "shipping": {
        "weight": 2,
        "dimensions": {
          "width": 8,
          "height": 11,
          "depth": 3
        }
      }
    }
    
    
  6. 选择“保存”。

  7. 查看左侧菜单中的“项”,确认文档已保存。

确认数据已写入 Kafka 主题

  1. 通过 http://localhost:9000 打开 Kafka 主题 UI。
  2. 选择创建的 Kafka“apparels”主题。
  3. 检查前面已插入到 Azure Cosmos DB 的文档是否显示在该 Kafka 主题中。

清理

若要在 Confluent 控制中心删除连接器,请导航到创建的源连接器,然后选择“删除”图标

“源连接器”对话框中“删除”选项的屏幕截图。

或者,使用连接器的 REST API:

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector

若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅这些步骤

源配置属性

以下设置用于配置 Kafka 源连接器。 这些配置值确定了要使用哪个 Azure Cosmos DB 容器、要从哪些 Kafka 主题写入数据,以及用于序列化数据的格式。 有关使用默认值的示例,请参阅此配置文件

配置属性名称 默认值 说明
connector.class Azure Cosmos DB 源的类名。 它应设置为 com.azure.cosmos.kafka.connect.CosmosSourceConnector
azure.cosmos.account.endpoint Cosmos DB 帐户终结点 URI
azure.cosmos.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 帐户密钥(仅当 auth.typeMasterKey 时才需要)
azure.cosmos.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供了首选区域用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.application.name "" 应用程序名称。 它作为 userAgent 后缀添加。
azure.cosmos.throughputControl.enabled false 用于指示是否启用吞吐量控制的标志。
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 吞吐量控制帐户终结点 URI。
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 帐户的 Azure 环境:AzureChina
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 帐户的 tenantId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.type MasterKey 目前支持两种身份验证类型:MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys)、ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 吞吐量控制帐户密钥(仅当 throughputControl.auth.typeMasterKey 时为必需)。
azure.cosmos.throughputControl.auth.aad.clientId "" 服务主体的 clientId/ApplicationId。 ServicePrincipal 身份验证所必需的。
azure.cosmos.throughputControl.auth.aad.clientSecret "" 服务主体的客户端密码/密码。
azure.cosmos.throughputControl.preferredRegionList [] 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供了首选区域用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Kafka 群集,并将 Kafka 群集区域作为首选区域传递。
azure.cosmos.throughputControl.mode.gateway false 指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。
azure.cosmos.throughputControl.group.name "" 吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。
azure.cosmos.throughputControl.targetThroughput -1 吞吐量控制组目标吞吐量。 该值应大于 0。
azure.cosmos.throughputControl.targetThroughputThreshold -1 吞吐量控制组目标吞吐量阈值。 该值的区间范围为 (0,1]。
azure.cosmos.throughputControl.priorityLevel None 吞吐量控制组优先级。 该值可以是 None、High 或 Low。
azure.cosmos.throughputControl.globalControl.database.name "" 用于吞吐量全局控制的数据库。
azure.cosmos.throughputControl.globalControl.container.name "" 用于吞吐量全局控制的容器。
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 此值控制客户端更新自身吞吐量使用情况并根据其他客户端的吞吐量使用情况调整自己吞吐量份额的频率。 默认值为 5 秒,允许的最小值为 5 秒。
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 这将控制检测到客户端脱机进而允许其他客户端获取其吞吐量份额的时间。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。
azure.cosmos.source.database.name Cosmos DB 数据库名称。
azure.cosmos.source.containers.includeAll false 用于指示是否从所有容器读取的标志。
azure.cosmos.source.containers.includedList [] 包含的容器。 如果 azure.cosmos.source.containers.includeAll 为 true,则忽略此配置。
azure.cosmos.source.containers.topicMap [] 映射到 Cosmos 容器的 Kafka 主题的逗号分隔列表。 例如:topic1#con1、topic2#con2。 默认情况下,容器名称用作发布数据的 kafka 主题的名称,可以使用此属性替代默认配置
azure.cosmos.source.changeFeed.startFrom Beginning 设置中的 ChangeFeed Start(Now、Beginning 或特定的时间点 (UTC),例如 2020-02-10T14:15:03)。 默认值为“Beginning”。
azure.cosmos.source.changeFeed.mode LatestVersion ChangeFeed 模式(LatestVersion 或 AllVersionsAndDeletes)。
azure.cosmos.source.changeFeed.maxItemCountHint 1000 单个更改源请求中返回的最大文档数。 但是,如果同一事务更改了多个项,则收到的项数可能高于指定的值。
azure.cosmos.source.metadata.poll.delay.ms 300000 指示检查元数据更改的频率(包括容器拆分/合并、添加/删除/重新创建容器)。 检测到更改后,它会重新配置任务。 默认值为 5 分钟。
azure.cosmos.source.metadata.storage.type Kafka 元数据的存储类型。 支持两种类型:Cosmos、Kafka。
azure.cosmos.source.metadata.storage.name _cosmos.metadata.topic 元数据存储的资源名称。 如果元数据存储类型为 Kafka 主题,则此配置引用 kafka 主题名称;如果元数据主题尚不存在,则创建元数据主题,否则它将使用预先创建的主题。 如果元数据存储类型为 Cosmos,则此配置引用容器名称;对于 MasterKey 身份验证,将使用具有 4000 RU 的 AutoScale 创建此容器(如果尚不存在);对于 ServicePrincipal 身份验证,需要提前创建容器。
azure.cosmos.source.messageKey.enabled true 是否设置 kafka 记录消息键。
azure.cosmos.source.messageKey.field id 要用作消息键的字段。

支持的数据类型

Azure Cosmos DB 源连接器将 JSON 文档转换为架构,支持以下 JSON 数据类型:

JSON 数据类型 架构类型
Array Array
布尔 布尔
数字 Float32
Float64
Int8
Int16
Int32
Int64
null 字符串
对象 (JSON) 结构
字符串 字符串

后续步骤