使用 Apache Kafka 将数据从 PostgreSQL 迁移到 Azure Cosmos DB Cassandra API 帐户Migrate data from PostgreSQL to Azure Cosmos DB Cassandra API account using Apache Kafka

适用于: Cassandra API

Azure Cosmos DB 中的 Cassandra API 已成为在 Apache Cassandra 上运行的企业工作负荷的极佳选择,原因各种各样,例如:Cassandra API in Azure Cosmos DB has become a great choice for enterprise workloads running on Apache Cassandra for various reasons such as:

  • 显著节省成本: 使用 Azure Cosmos DB 可以节省成本,其中包括 VM、带宽以及任何适用 Oracle 许可证的成本。Significant cost savings: You can save cost with Azure Cosmos DB, which includes the cost of VM's, bandwidth, and any applicable Oracle licenses. 另外,你无需管理数据中心、服务器、SSD 存储、网络以及电力成本。Additionally, you don't have to manage the data centers, servers, SSD storage, networking, and electricity costs.

  • 更高的可伸缩性和可用性: 它可以消除单一故障点,并提高应用程序的可伸缩性和可用性。Better scalability and availability: It eliminates single points of failure, better scalability, and availability for your applications.

  • 无管理和监视开销: 作为一个完全托管式的云服务,Azure Cosmos DB 消除了管理和监视大量设置所带来的开销。No overhead of managing and monitoring: As a fully managed cloud service, Azure Cosmos DB removes the overhead of managing and monitoring a myriad of settings.

Kafka Connect 是一个平台,用于在 Apache Kafka 和其他系统之间以可缩放且可靠的方式流式传输数据。Kafka Connect is a platform to stream data between Apache Kafka and other systems in a scalable and reliable manner. 它支持多个现成的连接器,这意味着你无需自定义代码即可将外部系统与 Apache Kafka 集成。It supports several off the shelf connectors, which means that you don't need custom code to integrate external systems with Apache Kafka.

本文将演示如何使用 Kafka 连接器的组合设置数据管道,以便将记录从关系数据库(例如 PostgreSQL)持续同步到 Azure Cosmos DB Cassandra APIThis article will demonstrate how to use a combination of Kafka connectors to set up a data pipeline to continuously synchronize records from a relational database such as PostgreSQL to Azure Cosmos DB Cassandra API.


下面是本文中提供的端到端流的概述。Here is high-level overview of the end to end flow presented in this article.

PostgreSQL 表中的数据将通过 Debezium PostgreSQL 连接器推送到 Apache Kafka,该连接器是 Kafka Connect 源连接器。Data in PostgreSQL table will be pushed to Apache Kafka using the Debezium PostgreSQL connector, which is a Kafka Connect source connector. 在 PostgreSQL 表中插入、更新或删除记录会被捕获为 change data 事件并发送到 Kafka 主题。Inserts, updates, or deletion to records in the PostgreSQL table will be captured as change data events and sent to Kafka topic(s). DataStax Apache Kafka 连接器(Kafka Connect 接收器连接器)构成了管道的第二部分。The DataStax Apache Kafka connector (Kafka Connect sink connector), forms the second part of the pipeline. 它会将 Kafka 主题中的变更数据事件同步到 Azure Cosmos DB Cassandra API 表。It will synchronize the change data events from Kafka topic to Azure Cosmos DB Cassandra API tables.


使用 DataStax Apache Kafka 连接器的特定功能,可以将数据推送到多个表。Using specific features of the DataStax Apache Kafka connector allows us to push data to multiple tables. 在此示例中,连接器会帮助我们将变更数据记录持久保存到可以支持不同查询要求的两个 Cassandra 表中。In this example, the connector will help us persist change data records to two Cassandra tables that can support different query requirements.


基本设置Base setup

设置 PostgreSQL 数据库(如果尚未这样做)。Set up PostgreSQL database if you haven't already.

可以使用现有的本地数据库,也可以在本地计算机上下载并安装一个数据库This could be an existing on-premise database or you could download and install one on your local machine. 还可以使用 Docker 容器It's also possible to use a Docker container.

若要启动容器,请执行以下操作:To start a container:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

使用 psql 客户端连接到你的 PostgreSQL 实例:Connect to your PostgreSQL instance using psql client:

psql -h localhost -p 5432 -U postgres -W -d postgres

创建一个表来存储示例订单信息:Create a table to store sample order information:


CREATE TABLE retail.orders_info (
    custid INTEGER NOT NULL,
    amount INTEGER NOT NULL,
    city VARCHAR(255) NOT NULL,
    purchase_time VARCHAR(40) NOT NULL

使用 Azure 门户,创建演示应用程序所需的 Cassandra 密钥空间和表。Using the Azure portal, create the Cassandra Keyspace and the tables required for the demo application.


使用与下面的内容相同的密钥空间和表名称Use the same Keyspace and table names as below

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

设置 Apache KafkaSetup Apache Kafka

本文使用本地群集,但你可以选择任何其他选项。This article uses a local cluster, but you can choose any other option. 下载 Kafka,将其解压缩,启动 Zookeeper 和 Kafka 群集。Download Kafka, unzip it, start the Zookeeper and Kafka cluster.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

设置连接器Setup connectors

安装 Debezium PostgreSQL 和 DataStax Apache Kafka 连接器。Install the Debezium PostgreSQL and DataStax Apache Kafka connector. 下载 Debezium PostgreSQL 连接器插件存档。Download the Debezium PostgreSQL connector plug-in archive. 例如,若要下载连接器的版本 1.3.0(撰写本文时的最新版本),请使用此链接For example, to download version 1.3.0 of the connector (latest at the time of writing), use this link. 此链接下载 DataStax Apache Kafka 连接器。Download the DataStax Apache Kafka connector from this link.

解压缩两个连接器存档并将 JAR 文件复制到 Kafka Connect plugin.pathUnzip both the connector archives and copy the JAR files to the Kafka Connect plugin.path.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

有关详细信息,请参阅 DebeziumDataStax 文档。For details, please refer to the Debezium and DataStax documentation.

配置 Kafka Connect 并启动数据管道Configure Kafka Connect and start data pipeline

启动 Kafka Connect 群集Start Kafka Connect cluster

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

启动 PostgreSQL 连接器实例Start PostgreSQL connector instance

将连接器配置 (JSON) 保存到文件示例 pg-source-config.jsonSave the connector configuration (JSON) to a file example pg-source-config.json

    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"

启动 PostgreSQL 连接器实例:To start the PostgreSQL connector instance:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors


若要删除,可以使用以下命令:curl -X DELETE http://localhost:8083/connectors/pg-orders-sourceTo delete, you can use: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

插入数据Insert data

orders_info 表包含订单详细信息,例如订单 ID、客户 ID、城市,等等。使用以下 SQL 以随机数填充表。The orders_info table contains order details such as order ID, customer ID, city etc. Populate the table with random data using the below SQL.

insert into retail.orders_info (
    custid, amount, city, purchase_time
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

它应将 10 条记录插入到表中。It should insert 10 records into the table. 请确保根据你的要求示例更新下面 generate_series(1, 10) 中的记录数,若要插入 100 条记录,请使用 generate_series(1, 100)Be sure to update the number of records in generate_series(1, 10) below as per your requirements example, to insert 100 records, use generate_series(1, 100)

进行确认:To confirm:

select * from retail.orders_info;

查看 Kafka 主题中的变更数据捕获事件Check the change data capture events in the Kafka topic


请注意,主题名称是遵循连接器约定myserver.retail.orders_infoNote that the topic name is myserver.retail.orders_info which as per the connector convention

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

你应当会看到采用 JSON 格式的变更数据事件。You should see the change data events in JSON format.

启动 DataStax Apache Kafka 连接器实例Start DataStax Apache Kafka connector instance

将连接器配置 (JSON) 保存到文件示例 cassandra-sink-config.json,根据你的环境来更新属性。Save the connector configuration (JSON) to a file example, cassandra-sink-config.json and update the properties as per your environment.

    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.cn",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. China East 2>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000

启动连接器实例:To start the connector instance:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

连接器应当会开始运转,从 PostgreSQL 到 Azure Cosmos DB 的端到端管道将可以运行。The connector should spring into action and the end to end pipeline from PostgreSQL to Azure Cosmos DB will be operational.

查询 Azure Cosmos DBQuery Azure Cosmos DB

查看 Azure Cosmos DB 中的 Cassandra 表。Check the Cassandra tables in Azure Cosmos DB. 下面是你可以尝试的一些查询:Here are some of the queries you can try:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

你可以继续在 PostgreSQL 中插入更多数据,并确认记录是否已同步到 Azure Cosmos DB。You can continue to insert more data into PostgreSQL and confirm that the records are synchronized to Azure Cosmos DB.

后续步骤Next steps