教程:使用 Stream analytics 处理用于事件中心的 Apache Kafka 事件Tutorial: Process Apache Kafka for Event Hubs events using Stream analytics

本文介绍如何将数据流式传输到事件中心,并使用 Azure 流分析对其进行处理。This article shows how to stream data into Event Hubs and process it with Azure Stream Analytics. 其中包括以下步骤:It walks you through the following steps:

  1. 创建事件中心命名空间。Create an Event Hubs namespace.
  2. 创建向事件中心发送消息的 Kafka 客户端。Create a Kafka client that sends messages to the event hub.
  3. 创建将数据从事件中心复制到 Azure Blob 存储的流分析作业。Create a Stream Analytics job that copies data from the event hub into an Azure blob storage.

使用事件中心公开的 Kafka 终结点时,无需更改协议客户端或运行自己的群集。You do not need to change your protocol clients or run your own clusters when you use the Kafka endpoint exposed by an event hub. Azure 事件中心支持 Apache Kafka 版本 1.0Azure Event Hubs supports Apache Kafka version 1.0. 及更高版本。and above.

先决条件Prerequisites

若要完成本快速入门,请确保符合以下先决条件:To complete this quickstart, make sure you have the following prerequisites:

创建事件中心命名空间Create an Event Hubs namespace

当你创建标准层事件中心命名空间时,系统会自动为该命名空间启用 Kafka 终结点。When you create a standard tier Event Hubs namespace, the Kafka endpoint for the namespace is automatically enabled. 可以将事件从使用 Kafka 协议的应用程序流式传输到标准层事件中心。You can stream events from your applications that use the Kafka protocol into standard tier Event Hubs. 按照使用 Azure 门户创建事件中心中的分步说明创建标准层事件中心命名空间。Follow step-by-step instructions in the Create an event hub using Azure portal to create a standard tier Event Hubs namespace.

备注

Kafka 的事件中心仅在标准专用层上可用。Event Hubs for Kafka is available only on standard and dedicated tiers. 基本 层不支持事件中心上的 Kafka。The basic tier doesn't support Kafka on Event Hubs.

在事件中心内使用 Kafka 发送消息Send messages with Kafka in Event Hubs

  1. 用于 Kafka 的 Azure 事件中心存储库克隆到计算机。Clone the Azure Event Hubs for Kafka repository to your machine.

  2. 导航到文件夹:azure-event-hubs-for-kafka/quickstart/java/producerNavigate to the folder: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. src/main/resources/producer.config 中更新生产者的配置详细信息。Update the configuration details for the producer in src/main/resources/producer.config. 指定事件中心命名空间的名称和连接字符串。Specify the name and connection string for the event hub namespace.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.chinacloudapi.cn:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. 导航到 azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/,在所选编辑器中打开 TestDataReporter.java。Navigate to azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/, and open TestDataReporter.java file in an editor of your choice.

  5. 为以下代码行添加注释:Comment out the following line of code:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. 添加以下代码行替代注释代码:Add the following line of code in place of the commented code:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
    
    This code sends the event data in **JSON** format. When you configure input for a Stream Analytics job, you specify JSON as the format for the input data. 
    
  7. Run the producer and stream into Event Hubs. On a Windows machine, when using a Node.js command prompt, switch to the azure-event-hubs-for-kafka/quickstart/java/producer folder before running these commands.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"
    

验证事件中心是否接收了数据Verify that event hub receives the data

  1. 在“实体”下,选择“事件中心”。Select Event Hubs under ENTITIES. 确认看到名为测试的事件中心。Confirm that you see an event hub named test.

    事件中心 - 测试

  2. 确认看到消息传入事件中心。Confirm that you see messages coming in to the event hub.

    事件中心 - 消息

使用流分析作业处理事件数据Process event data using a Stream Analytics job

在本部分中,创建 Azure 流分析作业。In this section, you create an Azure Stream Analytics job. Kafka 客户端将事件发送到事件中心。The Kafka client sends events to the event hub. 创建流分析作业,该作业采用事件数据作为输入,并将其输出到 Azure Blob 存储。You create a Stream Analytics job that takes event data as input and outputs it to an Azure blob storage. 如果没有 Azure 存储帐户,请创建一个帐户If you don't have an Azure Storage account, create one.

流分析作业中的查询经过数据,而不执行任何分析。The query in the Stream Analytics job passes through the data without performing any analytics. 可以创建一个查询,该查询转换输入数据,生成不同格式的或带有见解的输出数据。You can create a query that transforms the input data to produce output data in a different format or with gained insights.

创建流分析作业Create a Stream Analytics job

  1. Azure 门户中,选择“+ 创建资源”。Select + Create a resource in the Azure portal.
  2. 在“Azure 市场”菜单中选择“Analytics”,然后选择“流分析作业”。Select Analytics in the Azure Marketplace menu, and select Stream Analytics job.
  3. 在“新建流分析”页上执行以下操作:On the New Stream Analytics page, do the following actions:
    1. 输入作业的名称。Enter a name for the job.

    2. 选择 订阅Select your subscription.

    3. 为资源组选择“新建”,并输入名称。Select Create new for the resource group and enter the name. 也可以使用现有资源组。You can also use an existing resource group.

    4. 选择作业的位置。Select a location for the job.

    5. 选择“创建”来创建作业。Select Create to create the job.

      新建流分析作业

配置作业输入Configure job input

  1. 在通知消息中,选择“转到资源”以查看“流分析作业”页。 In the notification message, select Go to resource to see the Stream Analytics job page.

  2. 选择左侧菜单上作业拓扑部分中的“输入”。Select Inputs in the JOB TOPOLOGY section on the left menu.

  3. 选择“添加流输入”并选择“事件中心”。Select Add stream input, and then select Event Hub.

    添加事件中心作为输入

  4. 在“事件中心输入”配置页上,执行以下操作:On the Event Hub input configuration page, do the following actions:

    1. 指定输入的别名。Specify an alias for the input.

    2. 选择 Azure 订阅Select your Azure subscription.

    3. 选择之前创建的事件中心命名空间。Select the event hub namespace your created earlier.

    4. 选择事件中心的测试。Select test for the event hub.

    5. 选择“保存” 。Select Save.

      事件中心输入配置

配置作业输出Configure job output

  1. 选择菜单上作业拓扑部分中的“输出”。Select Outputs in the JOB TOPOLOGY section on the menu.
  2. 选择工具栏上的“+ 添加”,并选择“Blob 存储”Select + Add on the toolbar, and select Blob storage
  3. 在“Blob 存储输出设置”页上执行以下操作:On the Blob storage output settings page, do the following actions:
    1. 指定输出的别名。Specify an alias for the output.

    2. 选择 Azure 订阅Select your Azure subscription.

    3. 选择 Azure 存储帐户。Select your Azure Storage account.

    4. 输入存储流分析查询的输出数据的容器的名称。Enter a name for the container that stores the output data from the Stream Analytics query.

    5. 选择“保存” 。Select Save.

      Blob 存储输出配置

定义查询Define a query

设置用于读取传入数据流的流分析作业以后,下一步是创建一个可分析实时数据的转换。After you have a Stream Analytics job setup to read an incoming data stream, the next step is to create a transformation that analyzes data in real time. 请使用流分析查询语言来定义转换查询。You define the transformation query by using Stream Analytics Query Language. 在本演练中,定义经过数据而不执行任何转换的查询。In this walkthrough, you define a query that passes through the data without performing any transformation.

  1. 选择“查询”。Select Query.

  2. 在查询窗口中,将 [YourOutputAlias] 替换为之前创建的输出别名。In the query window, replace [YourOutputAlias] with the output alias you created earlier.

  3. [YourInputAlias] 替换为之前创建的输入别名。Replace [YourInputAlias] with the input alias you created earlier.

  4. 在工具栏上选择“保存”。Select Save on the toolbar.

    查询

运行流分析作业Run the Stream Analytics job

  1. 选择左侧菜单上的“概览”。Select Overview on the left menu.

  2. 选择“开始”。Select Start.

    “开始”菜单

  3. 在“启动作业”页上选择“启动”。On the Start job page, select Start.

    “启动作业”页

  4. 等待,直到作业状态从“正在启动”更改为“正在运行”。Wait until the status of the job changes from Starting to running.

    作业状态 - 正在运行

测试方案Test the scenario

  1. 再次运行“Kafka 生产者”,将事件发送到事件中心。Run the Kafka producer again to send events to the event hub.

    mvn exec:java -Dexec.mainClass="TestProducer"
    
  2. 确认看到在“Azure Blob 存储”中生成“输出数据”。Confirm that you see output data is generated in the Azure blob storage. 看到容器中具有 100 行的 JSON 文件,类似以下示例行:You see a JSON file in the container with 100 rows that look like the following sample rows:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    在此方案中,Azure 流分析作业接收来自事件中心的输入数据,并将其存储在 Azure Blob 存储中。The Azure Stream Analytics job received input data from the event hub and stored it in the Azure blob storage in this scenario.

后续步骤Next steps

本文介绍了如何在不更改协议客户端或运行自己的群集的情况下,将事件流式传输到事件中心。In this article, you learned how to stream into Event Hubs without changing your protocol clients or running your own clusters. 若要详细了解用于 Apache Kafka 的事件中心,请参阅适用于 Azure 事件中心的 Apache Kafka 开发人员指南To learn more about Event Hubs for Apache Kafka, see Apache Kafka developer guide for Azure Event Hubs.