Kafka Connect for Azure Cosmos DB 是一种连接器,用于从 Azure Cosmos DB 读取和写入数据。 Azure Cosmos DB 源连接器提供从 Azure Cosmos DB 更改源读取数据并将此数据发布到 Kafka 主题的功能。
先决条件
从 Confluent 平台设置 开始,因为它为你提供了一个完整的环境来使用。 如果不想使用 Confluent 平台,则需要自行安装和配置 Zookeeper、Apache Kafka、Kafka Connect。 还需要手动安装和配置 Azure Cosmos DB 连接器。
创建 Azure Cosmos DB 帐户,容器 设置指南
Bash shell,在 GitHub Codespaces、Mac、Ubuntu、Windows 上使用 WSL2 进行测试。 此 shell 在 WSL1 中不起作用。
下载 Java 11+
下载 Maven
安装源连接器
如果使用建议的 Confluent 平台设置,则安装中包含 Azure Cosmos DB 源连接器,可以跳过此步骤。
否则,您可以使用最新Release中的JAR文件并手动安装连接器。 若要了解详细信息,请参阅这些 说明。 还可以从源代码打包新的 JAR 文件:
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*dependencies.jar
创建 Kafka 主题
使用 Confluent 控制中心创建 Kafka 主题。 对于此方案,我们将创建一个名为“服装”的 Kafka 主题,并将非架构嵌入式 JSON 数据写入主题。 若要在控制中心内创建主题,请参阅 创建 Kafka 主题文档。
创建源连接器
在 Kafka Connect 中创建源连接器
若要在 Kafka Connect 中创建 Azure Cosmos DB 源连接器,请使用以下 JSON 配置。请确保替换先决条件中的 Azure Cosmos DB 设置指南中应保存的属性占位符值connect.cosmos.connection.endpointconnect.cosmos.master.key。
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.cn:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "apparels#kafka",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
有关上述每个配置属性的详细信息,请参阅 源属性 部分。 填写所有值后,将 JSON 文件保存在本地某个位置。 可以使用此文件通过 REST API 创建连接器。
使用控制中心创建连接器
创建连接器的简单选项来自 Confluent 控制中心门户。 按照 Confluent 设置指南 从控制中心创建连接器。 设置时,不要使用 DatagenConnector 选项,而是使用 CosmosDBSourceConnector 磁贴。 配置源连接器时,填写在 JSON 文件中填写的值。
或者,在连接器页面上,可以使用上传连接器配置文件选项上传之前部分生成的 JSON 文件。
使用 REST API 创建连接器
使用连接 REST API 创建源连接器
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
将文档插入 Azure Cosmos DB
登录到 Azure 门户 并导航到 Azure Cosmos DB 帐户。
打开“数据浏览”选项卡,然后选择“数据库”
打开前面创建的“kafkaconnect”数据库和“kafka”容器。
若要创建新的 JSON 文档,请在 API for NoSQL 窗格中展开“kafka”容器,选择 “项”,然后在工具栏中选择 “新建项 ”。
现在,使用以下结构将文档添加到容器。 将以下示例 JSON 块粘贴到“项”选项卡中,覆盖当前内容:
{ "id": "2", "productId": "33218897", "category": "Women's Outerwear", "manufacturer": "Contoso", "description": "Black wool pea-coat", "price": "49.99", "shipping": { "weight": 2, "dimensions": { "width": 8, "height": 11, "depth": 3 } } }选择“保存”。
通过查看左侧菜单上的“项目”确认文档已保存。
确认写入 Kafka 主题的数据
- 打开 Kafka 主题用户界面
http://localhost:9000。 - 选择创建的 Kafka“服装”主题。
- 验证之前插入到 Azure Cosmos DB 中的文档是否显示在 Kafka 主题中。
清理行动
若要从 Confluent 控制中心删除连接器,请导航到创建的源连接器,然后选择 “删除” 图标。
或者,使用连接器的 REST API:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector
若要使用 Azure CLI 删除创建的 Azure Cosmos DB 服务及其资源组,请参阅 以下步骤。
源配置属性
以下设置用于配置 Kafka 源连接器。 这些配置值确定使用哪个 Azure Cosmos DB 容器、从中写入 Kafka 主题的数据以及序列化数据的格式。 有关具有默认值的示例,请参阅此 配置文件。
| Name | 类型 | Description | Required/optional |
|---|---|---|---|
| connector.class | String | Azure Cosmos DB 源的类名。 应将其设置为 com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector |
必选 |
| connect.cosmos.databasename | String | 要从中读取的数据库的名称。 | 必选 |
| connect.cosmos.master.key | String | Azure Cosmos DB 主密钥。 | 必选 |
| connect.cosmos.connection.endpoint | URI | 帐户终结点。 | 必选 |
| connect.cosmos.containers.topicmap | String | 逗号分隔的主题到容器映射。 例如,topic1#coll1、topic2#coll2 | 必选 |
| connect.cosmos.connection.gateway.enabled | 布尔 | 用于指示是否使用网关模式的标志。 默认情况下为“false”。 | 可选 |
| connect.cosmos.messagekey.enabled | 布尔 | 此值表示是否应设置 Kafka 消息键。 默认值为 true |
必选 |
| 连接.宇宙.消息键.字段 | String | 使用文档中的字段值作为消息键。 默认值为 id。 |
必选 |
| connect.cosmos.offset.useLatest | 布尔 | 将true设置为使用最新的源偏移量。 将false设置为使用最早记录的偏移量。 默认值为 false。 |
必选 |
| connect.cosmos.task.poll.interval | int (整数) | 轮询更改源容器以检测更改的间隔。 | 必选 |
| 密钥转换器 | String | 写入 Kafka 主题的密钥数据的序列化格式。 | 必选 |
| 数值转换器 | String | 写入 Kafka 主题的值数据的序列化格式。 | 必选 |
| key.converter.schemas.enable | String | 如果键数据具有嵌入架构,则设置为 true。 |
可选 |
| value.converter.schemas.enable | String | 如果键数据具有嵌入架构,则设置为 true。 |
可选 |
| tasks.max | int (整数) | 连接器源任务的最大数目。 默认值为 1。 |
可选 |
支持的数据类型
Azure Cosmos DB 源连接器将 JSON 文档转换为架构,并支持以下 JSON 数据类型:
| JSON 数据类型 | 架构类型 |
|---|---|
| Array | Array |
| 布尔 | 布尔 |
| 编号 | Float32 Float64 Int8 Int16 Int32 Int64 |
| Null | String |
| 对象(JSON) | 结构 |
| String | String |
后续步骤
- 用于 Azure Cosmos DB 的 Kafka Connect 汇聚接收器