Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。 Azure Cosmos DB 源连接器提供从 Azure Cosmos DB 更改源读取数据并将此数据发布到 Kafka 主题的功能。
先决条件
- 从 Confluent 平台设置 开始,因为它为你提供了一个完整的环境来使用。 如果不想使用 Confluent 平台,则需要自行安装和配置 Zookeeper、Apache Kafka、Kafka Connect。 还需要手动安装和配置 Azure Cosmos DB 连接器。
- 创建 Azure Cosmos DB 帐户,容器 设置指南
- Bash shell
- 下载 Java 11+
- 下载 Maven
安装源连接器
如果使用建议的 Confluent 平台设置,则安装中包含 Azure Cosmos DB 源连接器,可以跳过此步骤。
否则,您可以使用最新Release中的JAR文件并手动安装连接器。 若要了解详细信息,请参阅这些 说明。 还可以从源代码打包新的 JAR 文件:
# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install
# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar
创建 Kafka 主题
使用 Confluent 控制中心创建 Kafka 主题。 对于此方案,我们将创建一个名为“服装”的 Kafka 主题,并将非schema 嵌入式 JSON 数据写入主题。 若要在控制中心内创建主题,请参阅 创建 Kafka 主题文档。
创建源连接器
在 Kafka Connect 中创建源连接器
若要在 Kafka Connect 中创建 Azure Cosmos DB 源连接器,请使用以下 JSON 配置。请确保在先决条件中替换从 Azure Cosmos DB 设置指南保存的属性的占位符值azure.cosmos.account.endpointazure.cosmos.account.key。
{
"name": "cosmosdb-source-connector-v2",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector",
"tasks.max": "5",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"azure.cosmos.account.endpoint":"{endpoint}",
"azure.cosmos.account.key":"{masterKey}",
"azure.cosmos.application.name": "{applicationName}",
"azure.cosmos.source.database.name":"{database}",
"azure.cosmos.source.containers.includedList":"{container}",
"azure.cosmos.source.changeFeed.maxItemCountHint":"500",
"azure.cosmos.source.containers.topicMap":"{topic}#{container}",
"azure.cosmos.source.metadata.storage.type":"Cosmos",
"azure.cosmos.source.metadata.storage.name":"{metadataContainerName}"
}
}
有关上述每个配置属性的详细信息,请参阅 源属性 部分。 填写所有值后,将 JSON 文件保存在本地某个位置。 可以使用此文件通过 REST API 创建连接器。
使用控制中心创建连接器
创建连接器的简单选项来自 Confluent 控制中心门户。 按照 Confluent 设置指南 从控制中心创建连接器。 设置时,不要使用 DatagenConnector 选项,而是使用 CosmosDBSourceConnector 磁贴。 配置源连接器时,填写 JSON 文件中填充的值。
或者,在连接器页面上,可以使用上传连接器配置文件选项上传之前部分生成的 JSON 文件。
使用 REST API 创建连接器
使用 Kafka Connect REST API 创建源连接器
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
将文档插入 Azure Cosmos DB
登录到 Azure 门户 并导航到 Azure Cosmos DB 帐户。
打开“数据浏览”选项卡,然后选择“数据库”
打开前面创建的“kafkaconnect”数据库和“kafka”容器。
若要创建新的 JSON 文档,请在 API for NoSQL 窗格中展开“kafka”容器,选择 “项”,然后在工具栏中选择 “新建项 ”。
现在,使用以下结构将文档添加到容器。 将以下示例 JSON 块粘贴到“项”选项卡中,覆盖当前内容:
{ "id": "2", "productId": "33218897", "category": "Women's Outerwear", "manufacturer": "Contoso", "description": "Black wool pea-coat", "price": "49.99", "shipping": { "weight": 2, "dimensions": { "width": 8, "height": 11, "depth": 3 } } }选择“保存”。
通过查看左侧菜单上的“项目”来确认文档是否已保存。
确认写入 Kafka 主题的数据
- 打开 Kafka 主题用户界面
http://localhost:9000。 - 选择创建的 Kafka“服装”主题。
- 验证之前插入到 Azure Cosmos DB 中的文档是否显示在 Kafka 主题中。
清理行动
若要从 Confluent 控制中心删除连接器,请导航到创建的源连接器,然后选择 “删除 ”图标。
或者,使用连接器的 REST API:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector
若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅 以下步骤。
源配置属性
以下设置用于配置 Kafka 源连接器。 这些配置值确定使用哪个 Azure Cosmos DB 容器、从中写入 Kafka 主题的数据以及序列化数据的格式。 有关具有默认值的示例,请参阅此 配置文件。
| 配置属性名称 | 违约 | Description |
|---|---|---|
connector.class |
None | Azure Cosmos DB 源的类名。 应将其设置为 com.azure.cosmos.kafka.connect.CosmosSourceConnector |
azure.cosmos.account.endpoint |
None | Cosmos DB 帐户终结点 URI |
azure.cosmos.account.environment |
Azure |
Cosmos DB 帐户的 Azure 环境: AzureChina。 |
azure.cosmos.account.tenantId |
"" |
Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.auth.type |
MasterKey |
目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal |
azure.cosmos.account.key |
"" |
Cosmos DB 帐户密钥(仅在auth.type是MasterKey时需要) |
azure.cosmos.auth.aad.clientId |
"" |
服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.auth.aad.clientSecret |
"" |
服务主体的客户端密钥/密码。 |
azure.cosmos.mode.gateway |
false |
用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。 |
azure.cosmos.preferredRegionList |
[] |
要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。 |
azure.cosmos.application.name |
"" |
应用程序名称。 它被添加为 userAgent 的后缀。 |
azure.cosmos.throughputControl.enabled |
false |
指示是否启用吞吐量控制的标志。 |
azure.cosmos.throughputControl.account.endpoint |
"" |
Cosmos DB 吞吐量控制账户终结点 URI。 |
azure.cosmos.throughputControl.account.environment |
Azure |
Cosmos DB 帐户的 Azure 环境: AzureChina。 |
azure.cosmos.throughputControl.account.tenantId |
"" |
Cosmos DB 帐户的租户 ID。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.throughputControl.auth.type |
MasterKey |
目前支持两种身份验证类型: MasterKey(PrimaryReadWriteKeys、SecondReadWriteKeys、PrimaryReadOnlyKeys、SecondReadWriteKeys)、 ServicePrincipal |
azure.cosmos.throughputControl.account.key |
"" |
Cosmos DB 吞吐量控制帐户密钥(仅在 throughputControl.auth.type 为 MasterKey 时需要)。 |
azure.cosmos.throughputControl.auth.aad.clientId |
"" |
服务主体的 clientId/ApplicationId(客户端标识/应用程序标识)。 进行 ServicePrincipal 验证是必须的。 |
azure.cosmos.throughputControl.auth.aad.clientSecret |
"" |
服务主体的客户端密钥/密码。 |
azure.cosmos.throughputControl.preferredRegionList |
[] |
要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值,提供用作提示的首选区域。 应将并置的 Kafka 群集与 Cosmos DB 帐户一起使用,并将 Kafka 群集的区域设置为首选区域。 |
azure.cosmos.throughputControl.mode.gateway |
false |
用于指示是否使用网关模式的标志。 默认情况下为 false,表示 SDK 使用直接模式。 |
azure.cosmos.throughputControl.group.name |
"" |
吞吐量控制组名称。 由于客户允许为容器创建多个组,因此名称应是唯一的。 |
azure.cosmos.throughputControl.targetThroughput |
-1 |
吞吐量控制组目标吞吐量。 该值应大于 0。 |
azure.cosmos.throughputControl.targetThroughputThreshold |
-1 |
吞吐量控制组目标吞吐量阈值。 该值应介于 (0,1) 之间。 |
azure.cosmos.throughputControl.priorityLevel |
None |
吞吐量控制组优先级。 该值可以是 None、High 或 Low。 |
azure.cosmos.throughputControl.globalControl.database.name |
"" |
用于吞吐量全局控制的数据库。 |
azure.cosmos.throughputControl.globalControl.container.name |
"" |
用于吞吐量全局控制的容器。 |
azure.cosmos.throughputControl.globalControl.renewIntervalInMS |
-1 |
这会控制客户端更新自身吞吐量使用情况的频率,并根据其他客户端的吞吐量使用情况调整自己的吞吐量共享。 默认值为 5 秒,允许的最小值为 5 秒。 |
azure.cosmos.throughputControl.globalControl.expireIntervalInMS |
-1 |
这可以控制我们如何快速检测客户端离线的速度,从而允许其他客户端接管其分配的吞吐量。 默认值为 11 秒,允许的最小值为 2 * renewIntervalInMS + 1。 |
azure.cosmos.source.database.name |
None | Cosmos DB 数据库名称。 |
azure.cosmos.source.containers.includeAll |
false |
用于指示是否从所有容器读取的标志。 |
azure.cosmos.source.containers.includedList |
[] |
随附容器。 如果 azure.cosmos.source.containers.includeAll 为 true,则忽略此配置。 |
azure.cosmos.source.containers.topicMap |
[] |
映射到 Cosmos 容器的 Kafka 主题的逗号分隔列表。 例如:topic1#con1、topic2#con2。 默认情况下,容器名称用作要发布数据的 Kafka 主题的名称,可以使用此属性覆盖默认配置。 |
azure.cosmos.source.changeFeed.startFrom |
Beginning |
ChangeFeed 起始的设置(现在、开始或一个确定的时间点(UTC),例如 2020-02-10T14:15:03)。 默认值为“开始”。 |
azure.cosmos.source.changeFeed.mode |
LatestVersion |
ChangeFeed 模式(LatestVersion 或 AllVersionsAndDeletes)。 |
azure.cosmos.source.changeFeed.maxItemCountHint |
1000 |
单个更改源请求中返回的最大文档数。 但是,如果同一事务更改了多个项,则收到的项数可能高于指定的值。 |
azure.cosmos.source.metadata.poll.delay.ms |
300000 |
指示检查元数据更改的频率(包括容器拆分/合并、添加/删除/重新创建容器)。 检测到更改后,它会重新配置任务。 默认值为 5 分钟。 |
azure.cosmos.source.metadata.storage.type |
Kafka |
元数据的存储类型。 支持两种类型:Cosmos、Kafka。 |
azure.cosmos.source.metadata.storage.name |
_cosmos.metadata.topic |
元数据存储的资源名称。 如果元数据存储类型为 Kafka 主题,则此配置引用 kafka 主题名称,如果元数据主题尚不存在,则创建元数据主题,否则它使用预先创建的主题。 如果元数据存储类型为 Cosmos,则该配置引用容器名称。如果使用 MasterKey 身份验证方式,并且容器尚未存在,则它会以 4000 RU 的配置创建 AutoScale;对于 ServicePrincipal 身份验证方式,容器需要提前创建。 |
azure.cosmos.source.messageKey.enabled |
true |
是否设置 kafka 记录消息键。 |
azure.cosmos.source.messageKey.field |
id |
要用作消息键的字段。 |
支持的数据类型
Azure Cosmos DB 源连接器将 JSON 文档转换为架构,并支持以下 JSON 数据类型:
| JSON 数据类型 | 架构类型 |
|---|---|
| Array | Array |
| 布尔 | 布尔 |
| 编号 | Float32 Float64 Int8 Int16 Int32 Int64 |
| Null | String |
| 对象(JSON) | 结构 |
| String | String |
后续步骤
- 用于 Azure Cosmos DB 的 Kafka Connect 汇聚接收器