将数据从 Kafka 引入到 Azure 数据资源管理器Ingest data from Kafka into Azure Data Explorer

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器提供从 Kafka 引入数据(加载数据)的功能。Azure Data Explorer offers ingestion (data loading) from Kafka. Kafka 是一个分布式流式处理平台,可用于构建实时流式处理数据管道,在系统或应用程序之间可靠地移动数据。Kafka is a distributed streaming platform that allows building of real-time streaming data pipelines that reliably move data between systems or applications.

先决条件Prerequisites

Kafka 连接器安装程序Kafka connector setup

Kafka Connect 是一个工具,用于在 Apache Kafka 和其他系统之间以可伸缩且可靠的方式流式传输数据。Kafka Connect is a tool for scalable and reliable streaming of data between Apache Kafka and other systems. 可以通过它简单快速地定义连接器,将大量数据移进和移出 Kafka。It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. ADX Kafka 接收器充当 Kafka 的连接器。The ADX Kafka Sink serves as the connector from Kafka.

捆绑Bundle

Kafka 可以加载 .jar 作为插件,该插件将充当自定义连接器。Kafka can load a .jar as a plugin that will act as a custom connector. 为了生成此类 .jar,我们将在本地克隆代码,并使用 Maven 进行构建。To produce such a .jar, we will clone the code locally and build using Maven.

克隆Clone

git clone git://github.com:Azure/kafka-sink-azure-kusto.git
cd ./kafka-sink-azure-kusto/kafka/

构建Build

通过 Maven 进行本地构建,以便生成一个带依赖项的 .jarBuild locally with Maven to produce a .jar complete with dependencies.

在根目录 kafka-sink-azure-kusto 中,运行:Inside the root directory kafka-sink-azure-kusto, run:

mvn clean compile assembly:single

部署Deploy

将插件加载到 Kafka 中。Load plugin into Kafka. kafka-sink-azure-kusto 中提供了使用 docker 的部署示例A deployment example using docker can be found at kafka-sink-azure-kusto

Kafka Connect 中详细记录了 Kafka 连接器以及如何部署它们Detailed documentation on Kafka connectors and how to deploy them can be found at Kafka Connect

示例配置Example configuration

name=KustoSinkConnector 
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector 
kusto.sink.flush_interval_ms=300000 
key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 
tasks.max=1 
topics=testing1 
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'TestTable','format': 'json', 'mapping':'TestMapping'}] 
kusto.auth.authority=XXX 
kusto.url=https://ingest-{mycluster}.kusto.chinacloudapi.cn/ 
kusto.auth.appid=XXX 
kusto.auth.appkey=XXX 
kusto.sink.tempdir=/var/tmp/ 
kusto.sink.flush_size=1000

在 ADX 中创建目标表Create a target table in ADX

在 ADX 中创建一个表,以便 Kafka 可以向其发送数据。Create a table in ADX to which Kafka can send data. 在“先决条件” 中预配的群集和数据库中创建表。Create the table in the cluster and database provisioned in the Prerequisites.

  1. 在 Azure 门户中导航到群集,然后选择“查询”。 In the Azure portal, navigate to your cluster and select Query.

    查询应用程序链接

  2. 将以下命令复制到窗口中,然后选择“运行” 。Copy the following command into the window and select Run.

    .create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
    

    运行创建查询

  3. 将以下命令复制到窗口中,然后选择“运行” 。Copy the following command into the window and select Run.

    .create table TestTable ingestion json mapping 'TestMapping' '[{"column":"TimeStamp","path":"$.timeStamp","datatype":"datetime"},{"column":"Name","path":"$.name","datatype":"string"},{"column":"Metric","path":"$.metric","datatype":"int"},{"column":"Source","path":"$.source","datatype":"string"}]'
    

    此命令将传入的 JSON 数据映射到表 (TestTable) 的列名称和数据类型。This command maps incoming JSON data to the column names and data types of the table (TestTable).

生成示例数据Generate sample data

Kafka 群集连接到 ADX 以后,即可使用下载的示例应用来生成数据。Now that the Kafka cluster is connected to ADX, use the sample app you downloaded to generate data.

克隆Clone

在本地克隆示例应用:Clone the sample app locally:

git clone git://github.com:Azure/azure-kusto-samples-dotnet.git
cd ./azure-kusto-samples-dotnet/kafka/

运行应用程序Run the app

  1. 在 Visual Studio 中打开示例应用解决方案。Open the sample app solution in Visual Studio.

  2. Program.cs 文件中,将 connectionString 常量更新为 Kafka 连接字符串。In the Program.cs file, update the connectionString constant to your Kafka connection string.

    const string connectionString = @"<YourConnectionString>";
    
  3. 构建并运行应用程序。Build and run the app. 应用将消息发送到 Kafka 群集,并且每 10 秒显示一次状态。The app sends messages to the Kafka cluster, and it prints out its status every 10 seconds.

  4. 应用发送一些消息后,继续执行下一步。After the app has sent a few messages, move on to the next step.

查询和查看数据Query and review the data

  1. 若要确保在引入期间不发生错误,请执行以下命令:To make sure no errors occurred during ingestion:

    .show ingestion failures
    
  2. 若要查看新引入的数据,请执行以下命令:To see the newly ingested data:

    TestTable 
    | count
    
  3. 若要查看消息的内容,请执行以下命令:To see the content of the messages:

    TestTable
    

    结果集应如下所示:The result set should look like the following:

    消息结果集

后续步骤Next steps