Integrate Apache Kafka Connect support on Azure Event Hubs with Debezium for Change Data Capture

Change Data Capture (CDC) is a technique used to track row-level changes in database tables in response to create, update, and delete operations. Debezium is a distributed platform that builds on top of Change Data Capture features available in different databases (for example, logical decoding in PostgreSQL). It provides a set of Kafka Connect connectors that tap into row-level changes in database tables and convert them into event streams that are then sent to Apache Kafka.

This tutorial walks you through how to set up a change data capture based system on Azure using Event Hubs (for Kafka), Azure Database for PostgreSQL and Debezium. It uses the Debezium PostgreSQL connector to stream database modifications from PostgreSQL to Kafka topics in Event Hubs.

Note

This article contains references to a term that Azure no longer uses. When the term is removed from the software, we'll remove it from this article.

In this tutorial, you take the following steps:

  • Create an Event Hubs namespace
  • Setup and configure Azure Database for PostgreSQL
  • Configure and run Kafka Connect with Debezium PostgreSQL connector
  • Test change data capture
  • (Optional) Consume change data events with a FileStreamSink connector

Prerequisites

To complete this walk through, you require:

Create an Event Hubs namespace

An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions to create a namespace and an event hub. Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. For instructions, see Get an Event Hubs connection string.

Set up and configure Azure Database for PostgreSQL

Azure Database for PostgreSQL is a relational database service based on the community version of open-source PostgreSQL database engine, and is available in three deployment options: Single Server, Flexible Server, and Cosmos DB for PostgreSQL. Follow these instructions to create an Azure Database for PostgreSQL server using the Azure portal.

Setup and run Kafka Connect

This section covers the following topics:

  • Debezium connector installation
  • Configuring Kafka Connect for Event Hubs
  • Start Kafka Connect cluster with Debezium connector

Download and setup Debezium connector

Follow the latest instructions in the Debezium documentation to download and set up the connector.

Configure Kafka Connect for Event Hubs

Minimal reconfiguration is necessary when redirecting Kafka Connect throughput from Kafka to Event Hubs. The following connect-distributed.properties sample illustrates how to configure Connect to authenticate and communicate with the Kafka endpoint on Event Hubs:

Important

  • Debezium will auto-create a topic per table and a bunch of metadata topics. Kafka topic corresponds to an Event Hubs instance (event hub). For Apache Kafka to Azure Event Hubs mappings, see Kafka and Event Hubs conceptual mapping.
  • There are different limits on number of event hubs in an Event Hubs namespace depending on the tier (Basic, Standard, Premium, or Dedicated). For these limits, See Quotas.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.chinacloudapi.cn:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Important

Replace {YOUR.EVENTHUBS.CONNECTION.STRING} with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Run Kafka Connect

In this step, a Kafka Connect worker is started locally in distributed mode, using Event Hubs to maintain cluster state.

  1. Save the above connect-distributed.properties file locally. Be sure to replace all values in braces.
  2. Navigate to the location of the Kafka release on your machine.
  3. Run ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties and wait for the cluster to start.

Note

Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. A quick check of the namespace in the Azure portal reveals that the Connect worker's internal topics have been created automatically.

Kafka Connect internal topics must use compaction. The Event Hubs team is not responsible for fixing improper configurations if internal Connect topics are incorrectly configured.

Configure and start the Debezium PostgreSQL source connector

Create a configuration file (pg-source-connector.json) for the PostgreSQL source connector - replace the values as per your Azure PostgreSQL instance.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.chinacloudapi.cn",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Tip

database.server.name attribute is a logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored.

To create an instance of the connector, use the Kafka Connect REST API endpoint:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

To check the status of the connector:

curl -s http://localhost:8083/connectors/todo-connector/status

Test change data capture

To see change data capture in action, you need to create/update/delete records in the Azure PostgreSQL database.

Start by connecting to your Azure PostgreSQL database (the following example uses psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.chinacloudapi.cn -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.chinacloudapi.cn -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Create a table and insert records

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

The connector should now spring into action and send change data events to an Event Hubs topic with the following name my-server.public.todos, assuming you have my-server as the value for database.server.name and public.todos is the table whose changes you're tracking (as per table.whitelist configuration).

Check Event Hubs topic

Let's introspect the contents of the topic to make sure everything is working as expected. The below example uses kafkacat, but you can also create a consumer using any of the options listed here.

Create a file named kafkacat.conf with the following contents:

metadata.broker.list=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Note

Update metadata.broker.list and sasl.password attributes in kafkacat.conf as per Event Hubs information.

In a different terminal, start a consumer:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

You should see the JSON payloads representing the change data events generated in PostgreSQL in response to the rows you had added to the todos table. Here's a snippet of the payload:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fullfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

The event consists of the payload along with its schema (omitted for brevity). In payload section, notice how the create operation ("op": "c") is represented - "before": null means that it was a newly INSERTed row, after provides values for the columns in the row, source provides the PostgreSQL instance metadata from where this event was picked up and so on.

You can try the same with update or delete operations as well and introspect the change data events. For example, to update the task status for configure and install connector (assuming its id is 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Optional) Install FileStreamSink connector

Now that all the todos table changes are being captured in Event Hubs topic, you use the FileStreamSink connector (that is available by default in Kafka Connect) to consume these events.

Create a configuration file (file-sink-connector.json) for the connector - replace the file attribute as per your file system.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

To create the connector and check its status:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Insert/update/delete database records and monitor the records in the configured output sink file:

tail -f /Users/foo/todos-cdc.txt

Cleanup

Kafka Connect creates Event Hubs topics to store configurations, offsets, and status that persist even after the Kafka Connect cluster has been taken down. Unless this persistence is desired, we recommend that you delete these topics. You might also want to delete the my-server.public.todos event hub that were created during this walk through.

Next steps

To learn more about Event Hubs for Kafka, see the following articles: