适用对象:
卡珊德拉
现有的 Cassandra 应用程序可以轻松地与 Azure Cosmos DB for Apache Cassandra 配合使用,因为它具有 CQLv4 驱动程序兼容性。 可以利用此功能与流式处理平台(例如 Apache Kafka)进行集成,将数据引入 Azure Cosmos DB。
只有被其他应用程序使用或引入到其他系统中时,Apache Kafka(主题)中的数据才有用。 可以使用你选择的语言和客户端 SDK通过 Kafka 生成者/使用者 API 来构建解决方案。 Kafka Connect 提供了一个替代解决方案。 它是一个平台,用于在 Apache Kafka 和其他系统之间以可缩放且可靠的方式流式传输数据。 由于 Kafka Connect 支持现成的连接器(包括 Cassandra),因此无需编写自定义代码来将 Kafka 与 Azure Cosmos DB for Apache Cassandra 集成。
本文使用开源的 DataStax Apache Kafka 连接器,其运行于 Kafka Connect 框架之上,将 Kafka 主题中的记录导入到 Cassandra 表的行中。 此示例提供了一个使用 Docker Compose 的可重用安装程序。 此示例允许使用单个命令在本地启动所有必需的组件。 这些组件包括 Kafka、Zookeeper、Kafka Connect 工作器和示例数据生成器应用程序。
下面是组件及其服务定义的细分。 请参阅 docker-compose
完整文件。
- Kafka 和 Zookeeper 使用 debezium 映像。
- 若要作为 Docker 容器运行,DataStax Apache Kafka 连接器包含在现有 Docker 映像之上: debezium/connect-base。 此映像包括 Kafka 及其 Kafka Connect 库的安装,这使得添加自定义连接器变得方便。 请参阅 Dockerfile。
-
data-generator
服务将随机生成的 (JSON) 数据植入到weather-data
Kafka 主题。 参考Dockerfile
中的代码和 。
先决条件
创建密钥空间、表,并启动集成管道
使用 Azure 门户,创建演示应用程序所需的 Cassandra 密钥空间和表。
注意
使用下面所用的相同密钥空间名称和表名称。
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
克隆 GitHub 存储库:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
启动所有服务:
docker-compose --project-name kafka-cosmos-cassandra up --build
注意
下载并启动容器可能需要一段时间。 此设置只是一个一次性的过程。
确认是否所有容器都已启动:
docker-compose -p kafka-cosmos-cassandra ps
数据生成器应用程序将开始将数据发送到 Kafka 中的 weather-data
主题。 还可以执行快速检查进行确认。 快速浏览运行 Kafka Connect 工作器的 Docker 容器:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
进入容器 shell 环境后,启动通常的 Kafka 控制台消费者进程。 应会看到 JSON 格式的天气数据流入。
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Cassandra 接收器连接器设置
将此处的 JSON 内容复制到文件。 将其命名为 cassandra-sink-config.json
。 需要根据设置对其进行更新。 本部分的其余部分提供了指导。
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.cn",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. China East>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
以下是属性的汇总:
基本连接
-
contactPoints
:输入 Azure Cosmos DB Cassandra 的联系点 -
loadBalancing.localDc
:输入 Azure Cosmos DB 帐户的区域,例如中国东部 -
auth.username
:输入用户名 -
auth.password
:输入密码 -
port
:输入端口值。 该值是10350
,而不是9042
。 保持原样
SSL 配置
Azure Cosmos DB 强制实施基于 SSL 的安全连接,Kafka Connect 连接器也支持 SSL。
-
ssl.keystore.path
:容器中 JDK 密钥存储的路径 -/etc/alternatives/jre/lib/security/cacerts/
-
ssl.keystore.password
:JDK 密钥存储(默认)密码 -
ssl.hostnameValidation
:我们启用了节点主机名验证 -
ssl.provider
:JDK
用作 SSL 提供程序
泛型参数
-
key.converter
:我们使用字符串转换器org.apache.kafka.connect.storage.StringConverter
-
value.converter
:由于 Kafka 主题中的数据为 JSON,因此我们使用org.apache.kafka.connect.json.JsonConverter
-
value.converter.schemas.enable
:由于我们的 JSON 有效负载没有与其关联的架构,为了演示应用,我们需要指示 Kafka Connect 不要查找架构,只需将此属性设置为false
。 如果不这样做,则会导致失败。
安装连接器
使用 Kafka Connect REST 终结点安装连接器:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
检查状态:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
如果一切顺利,连接器应当会开始发挥作用。 它应向 Azure Cosmos DB 进行身份验证,并开始将数据从 Kafka 主题(weather-data
)引入 Cassandra 表: weather.data_by_state
和 weather.data_by_station
。
现在,你可以通过已安装在本地计算机上的 CQLSH 来查询表中的数据。
查询 Azure Cosmos DB 中的数据
检查 data_by_state
和 data_by_station
表。 下面是一些可帮助入门的示例查询:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
清理资源
执行完应用和 Azure Cosmos DB 帐户的操作以后,可以删除所创建的 Azure 资源,以免产生更多费用。 若要删除资源,请执行以下操作:
在 Azure 门户的“搜索”栏中,搜索并选择“资源组”。
从列表中选择为本快速入门创建的资源组。
在资源组“概览”页上,选择“删除资源组”。
在下一窗口中输入要删除的资源组的名称,然后选择“删除”。