使用 Kafka Connect 将数据从 Apache Kafka 引入到 Azure Cosmos DB Cassandra APIIngest data from Apache Kafka into Azure Cosmos DB Cassandra API using Kafka Connect

适用于: Cassandra API

现有 Cassandra 应用程序可以轻松使用 Azure Cosmos DB Cassandra API,因为它提供了 CQLv4 驱动程序兼容性Existing Cassandra applications can easily work with the Azure Cosmos DB Cassandra API because of its CQLv4 driver compatibility. 你可以利用此功能与流式处理平台(例如 Apache Kafka)进行集成,将数据引入 Azure Cosmos DB。You leverage this capability to integrate with streaming platforms such as Apache Kafka and bring data into Azure Cosmos DB.

只有被其他应用程序使用或引入到其他系统中时,Apache Kafka(主题)中的数据才有用。Data in Apache Kafka (topics) is only useful when consumed by other applications or ingested into other systems. 可以使用你选择的语言和客户端 SDK通过 Kafka 生成者/使用者 API 来构建解决方案。It's possible to build a solution using the Kafka Producer/Consumer APIs using a language and client SDK of your choice. Kafka Connect 提供了一个替代解决方案。Kafka Connect provides an alternative solution. 它是一个平台,用于在 Apache Kafka 和其他系统之间以可缩放且可靠的方式流式传输数据。It's a platform to stream data between Apache Kafka and other systems in a scalable and reliable manner. 由于 Kafka Connect 支持包括 Cassandra 在内的现成连接器,因此你无需编写自定义代码便可将 Kafka 与 Azure Cosmos DB Cassandra API 进行集成。Since Kafka Connect supports off the shelf connectors which includes Cassandra, you don't need to write custom code to integrate Kafka with Azure Cosmos DB Cassandra API.

在本文中,我们将使用开源 DataStax Apache Kafka 连接器,它在 Kafka Connect 框架上运行,可将 Kafka 主题中的记录引入到一个或多个 Cassandra 表的行中。In this article, we will be using the open-source DataStax Apache Kafka connector, that works on top of Kafka Connect framework to ingest records from a Kafka topic into rows of one or more Cassandra tables. 此示例提供了一个使用 Docker Compose 的可重用安装程序。The example provides a reusable setup using Docker Compose. 这非常方便,因为它使你能够使用单个命令在本地启动所有必需的组件。This is quite convenient since it enables you to bootstrap all the required components locally with a single command. 这些组件包括 Kafka、Zookeeper、Kafka Connect 工作器和示例数据生成器应用程序。These components include Kafka, Zookeeper, Kafka Connect worker, and the sample data generator application.

下面是组件及其服务定义的明细,你可以参考 GitHub 存储库中的完整 docker-compose 文件。Here is a breakdown of the components and their service definitions - you can refer to the complete docker-compose file in the GitHub repo.

  • Kafka 和 Zookeeper 使用 debezium 映像。Kafka and Zookeeper use debezium images.
  • 将要作为 Docker 容器运行的 DataStax Apache Kafka 连接器在生成时基于现有的 Docker 映像 - debezium/connect-baseTo run as a Docker container, the DataStax Apache Kafka Connector is baked on top of an existing Docker image - debezium/connect-base. 此映像包括 Kafka 及其 Kafka Connect 库的安装,因此添加自定义连接器非常方便。This image includes an installation of Kafka and its Kafka Connect libraries, thus making it really convenient to add custom connectors. 你可以参考 DockerfileYou can refer to the Dockerfile.
  • data-generator 服务将随机生成的 (JSON) 数据植入到 weather-data Kafka 主题。The data-generator service seeds randomly generated (JSON) data into the weather-data Kafka topic. 你可以参考 GitHub 存储库中的代码和 DockerfileYou can refer to the code and Dockerfile in the GitHub repo


创建密钥空间、表,并启动集成管道Create Keyspace, tables and start the integration pipeline

使用 Azure 门户,创建演示应用程序所需的 Cassandra 密钥空间和表。Using the Azure portal, create the Cassandra Keyspace and the tables required for the demo application.


使用与下面的内容相同的密钥空间和表名称Use the same Keyspace and table names as below

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

克隆 GitHub 存储库:Clone the GitHub repo:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

启动所有服务:Start all the services:

docker-compose --project-name kafka-cosmos-cassandra up --build


下载并启动容器可能需要一段时间:这只是一个一次性过程。It might take a while to download and start the containers: this is just a one time process.

确认是否所有容器都已启动:To confirm whether all the containers have started:

docker-compose -p kafka-cosmos-cassandra ps

数据生成器应用程序将开始将数据发送到 Kafka 中的 weather-data 主题。The data generator application will start pumping data into the weather-data topic in Kafka. 你还可以执行快速的健全性检查以进行确认。You can also do quick sanity check to confirm. 快速浏览运行 Kafka Connect 工作器的 Docker 容器:Peek into the Docker container running the Kafka connect worker:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

进入容器 shell 后,只需启动平常的 Kafka 控制台使用者进程,就会看到天气数据(采用 JSON 格式)流入。Once you drop into the container shell, just start the usual Kafka console consumer process and you should see weather data (in JSON format) flowing in.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Cassandra 接收器连接器设置Cassandra Sink connector setup

将下面的 JSON 内容复制到一个文件中(你可以将其命名为 cassandra-sink-config.json)。Copy the JSON contents below to a file (you can name it cassandra-sink-config.json). 你需要根据你的设置对其进行更新,本部分的其余内容将提供有关此主题的指导。You will need to update it as per your setup and the rest of this section will provide guidance around this topic.

    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.cn",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. China East>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000

以下是属性的汇总:Here is a summary of the attributes:

基本连接Basic connectivity

  • contactPoints:输入 Cosmos DB Cassandra 的联系点contactPoints: enter the contact point for Cosmos DB Cassandra
  • loadBalancing.localDc:输入 Cosmos DB 帐户的区域,例如“中国东部”loadBalancing.localDc: enter the region for Cosmos DB account e.g. China East
  • auth.username:输入用户名auth.username: enter the username
  • auth.password:输入密码auth.password: enter the password
  • port:输入端口值(此值为 10350,而不是 9042port: enter the port value (this is 10350, not 9042. 请将其保持原样)leave it as is)

SSL 配置SSL configuration

Azure Cosmos DB 强制实施基于 SSL 的安全连接,Kafka Connect 连接器也支持 SSL。Azure Cosmos DB enforces secure connectivity over SSL and Kafka Connect connector supports SSL as well.

  • ssl.keystore.path:容器中 JDK 密钥存储的路径 - /etc/alternatives/jre/lib/security/cacerts/ssl.keystore.path: path to the JDK keystore in the container - /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password:JDK 密钥存储(默认)密码ssl.keystore.password: JDK keystore (default) password
  • ssl.hostnameValidation:我们启用了节点主机名验证ssl.hostnameValidation: We turn own node hostname validation
  • ssl.providerJDK 用作 SSL 提供程序ssl.provider: JDK is used as the SSL provider

泛型参数Generic parameters

  • key.converter:我们使用字符串转换器 org.apache.kafka.connect.storage.StringConverterkey.converter: We use the string converter org.apache.kafka.connect.storage.StringConverter
  • value.converter:由于 Kafka 主题中的数据是 JSON,因此我们使用 org.apache.kafka.connect.json.JsonConvertervalue.converter: since the data in Kafka topics is JSON, we make use of org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable:由于我们的 JSON 有效负载没有与之关联的架构(用于演示应用),因此我们需要通过将此属性设置为 false 来指示 Kafka Connect 不查找架构。value.converter.schemas.enable: Since our JSON payload doesn't have a schema associated with it (for the purposes of the demo app), we need to instruct Kafka Connect to not look for a schema by setting this attribute to false. 如果不这样做,则会导致失败。Not doing so will result in failures.

安装连接器Install the connector

使用 Kafka Connect REST 终结点安装连接器:Install the connector using the Kafka Connect REST endpoint:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

检查状态:To check the status:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

如果一切顺利,连接器应当会开始发挥作用。If all goes well, the connector should start weaving its magic. 它应当向 Azure Cosmos DB 进行身份验证,并开始将 Kafka 主题 (weather-data) 中的数据引入到 Cassandra 表(weather.data_by_stateweather.data_by_station)中。It should authenticate to Azure Cosmos DB and start ingesting data from the Kafka topic (weather-data) into Cassandra tables - weather.data_by_state and weather.data_by_station

现在,你可以通过已安装在本地计算机上的 CQLSH 来查询表中的数据。You can now query data in the tables via CQLSH that installed on local computer.

查询 Azure Cosmos DB 中的数据Query data from Azure Cosmos DB

检查 data_by_statedata_by_station 表。Check the data_by_state and data_by_station tables. 下面是一些可帮助你入门的示例查询:Here is some sample queries to get you started:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

清理资源Clean up resources

执行完应用和 Azure Cosmos DB 帐户的操作以后,可以删除所创建的 Azure 资源,以免产生更多费用。When you're done with your app and Azure Cosmos DB account, you can delete the Azure resources you created so you don't incur more charges. 若要删除资源,请执行以下操作:To delete the resources:

  1. 在 Azure 门户的“搜索”栏中,搜索并选择“资源组” 。In the Azure portal Search bar, search for and select Resource groups.

  2. 从列表中选择为本快速入门创建的资源组。From the list, select the resource group you created for this quickstart.


  3. 在资源组“概览”页上,选择“删除资源组” 。On the resource group Overview page, select Delete resource group.


  4. 在下一窗口中输入要删除的资源组的名称,然后选择“删除” 。In the next window, enter the name of the resource group to delete, and then select Delete.

后续步骤Next steps