Integrate Apache Kafka Connect support on Azure Event Hubs
Apache Kafka Connect is a framework to connect and import/export data from/to any external system such as MySQL, HDFS, and file system through a Kafka cluster. This article walks you through using Kafka Connect framework with Event Hubs.
This article walks you through integrating Kafka Connect with an event hub and deploying basic FileStreamSource
and FileStreamSink
connectors. While these connectors aren't meant for production use, they demonstrate an end-to-end Kafka Connect scenario where Azure Event Hubs acts as a Kafka broker.
Note
This sample is available on GitHub.
Prerequisites
To complete this walkthrough, make sure you have the following prerequisites:
- Azure subscription. If you don't have one, create a trial subscription.
- Git
- Linux/MacOS
- Latest Kafka release available from kafka.apache.org
- Read through the Event Hubs for Apache Kafka introduction article
Create an Event Hubs namespace
An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions to create a namespace and an event hub. Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. For instructions, see Get an Event Hubs connection string.
Clone the example project
Clone the Azure Event Hubs repository and navigate to the tutorials/connect subfolder:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configure Kafka Connect for Event Hubs
Minimal reconfiguration is necessary when redirecting Kafka Connect throughput from Kafka to Event Hubs. The following connect-distributed.properties
sample illustrates how to configure Connect to authenticate and communicate with the Kafka endpoint on Event Hubs:
# e.g. namespace.servicebus.chinacloudapi.cn:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Important
Replace {YOUR.EVENTHUBS.CONNECTION.STRING}
with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Run Kafka Connect
In this step, a Kafka Connect worker is started locally in distributed mode, using Event Hubs to maintain cluster state.
- Save the
connect-distributed.properties
file locally. Be sure to replace all values in braces. - Navigate to the location of the Kafka release on your machine.
- Run
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. The Connect worker REST API is ready for interaction when you see'INFO Finished starting connectors and tasks'
.
Note
Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. A quick check of the namespace in the Azure portal reveals that the Connect worker's internal topics have been created automatically.
Kafka Connect internal topics must use compaction. The Event Hubs team is not responsible for fixing improper configurations if internal Connect topics are incorrectly configured.
Create connectors
This section walks you through spinning up FileStreamSource
and FileStreamSink
connectors.
Create a directory for input and output data files.
mkdir ~/connect-quickstart
Create two files: one file with seed data from which the
FileStreamSource
connector reads, and another to which ourFileStreamSink
connector writes.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Create a
FileStreamSource
connector. Be sure to replace the curly braces with your home directory path.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
You should see the event hub
connect-quickstart
on your Event Hubs instance after running the command.Check status of source connector.
curl -s http://localhost:8083/connectors/file-source/status
Optionally, you can use Service Bus Explorer to verify that events arrived in the
connect-quickstart
topic.Create a FileStreamSink Connector. Again, make sure you replace the curly braces with your home directory path.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Check the status of sink connector.
curl -s http://localhost:8083/connectors/file-sink/status
Verify that data has been replicated between files and that the data is identical across both files.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Cleanup
Kafka Connect creates Event Hubs topics to store configurations, offsets, and status that persist even after the Connect cluster has been taken down. Unless this persistence is desired, we recommend that you delete these topics. You might also want to delete the connect-quickstart
Event Hubs that were created during this walkthrough.
Related content
To learn more about Event Hubs for Kafka, see the following articles: