使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件Use Java to send events to or receive events from Azure Event Hubs (azure-messaging-eventhubs)

本快速入门介绍如何使用 azure-messaging-eventhubs Java 包向事件中心发送事件以及从事件中心接收事件。This quickstart shows how to send events to and receive events from an event hub using the azure-messaging-eventhubs Java package.

重要

本快速入门使用新的 azure-messaging-eventhubs 库。This quickstart uses the new azure-messaging-eventhubs package. 有关使用旧的 azure-eventhubsazure-eventhubs-eph 包的快速入门,请参阅使用 azure-eventhubs 和 azure-eventhubs-eph 发送和接收事件For a quickstart that uses the old azure-eventhubs and azure-eventhubs-eph packages, see Send and receive events using azure-eventhubs and azure-eventhubs-eph.

先决条件Prerequisites

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

若要完成本快速入门,需要具备以下先决条件:To complete this quickstart, you need the following prerequisites:

  • Azure 订阅Azure subscription. 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。To use Azure services, including Azure Event Hubs, you need a subscription. 如果没有现有 Azure 帐户,可以注册 1 元试用版创建帐户If you don't have an existing Azure account, you can sign up for a 1mb trial or create an account.
  • Java 开发环境。A Java development environment. 本快速入门使用 EclipseThis quickstart uses Eclipse. 需要 Java 开发工具包 (JDK) 版本 8 或更高版本。Java Development Kit (JDK) with version 8 or above is required.
  • 创建事件中心命名空间和事件中心。Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串:获取连接字符串Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. 稍后将在本快速入门中使用连接字符串。You use the connection string later in this quickstart.

发送事件Send events

本部分介绍如何创建一个向事件中心发送事件的 Java 应用程序。This section shows you how to create a Java application to send events an event hub.

将引用添加到 Azure 事件中心库Add reference to Azure Event Hubs library

Maven 中心存储库中提供了事件中心的 Java 客户端库。The Java client library for Event Hubs is available in the Maven Central Repository. 可使用 Maven 项目文件中的以下依赖项声明引用此库:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.0.1</version>
</dependency>

编写代码以将消息发送到事件中心Write code to send messages to the event hub

对于以下示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 添加一个名为 Sender 的类,并向该类中添加以下代码:Add a class named Sender, and add the following code to the class:

import com.azure.messaging.eventhubs.*;
import static java.nio.charset.StandardCharsets.UTF_8;

public class Sender {
       public static void main(String[] args) {
    }
}

连接字符串和事件中心Connection string and event hub

此代码使用事件中心命名空间的连接字符串以及事件中心的名称来生成事件中心客户端。This code uses the connection string to the Event Hubs namespace and the name of the event hub to build an Event Hubs client.

String connectionString = "<CONNECTION STRING to EVENT HUBS NAMESPACE>";
String eventHubName = "<EVENT HUB NAME>";

创建事件中心生成者客户端Create an Event Hubs Producer client

此代码创建一个生成者客户端对象,用于生成事件/向事件中心发送事件。This code creates a producer client object that's used to produce/send events to the event hub.

EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString(connectionString, eventHubName)
    .buildProducerClient();

准备一批事件Prepare a batch of events

此代码准备一批事件。This code prepares a batch of events.

EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));

向事件中心发送事件批Send the batch of events to the event hub

此代码将上一步骤中准备的事件批发送到事件中心。This code sends the batch of events you prepared in the previous step to the event hub. 以下代码将在执行发送操作时阻塞。The following code blocks on the send operation.

producer.send(batch);

关闭和清理Close and cleanup

此代码关闭生成者。This code closes the producer.

producer.close();

发送事件的完整代码Complete code to send events

下面是将事件发送到事件中心的完整代码。Here is the complete code to send events to the event hub.

import com.azure.messaging.eventhubs.*;

public class Sender {
    public static void main(String[] args) {
        final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
        final String eventHubName = "EVENT HUB NAME";

        // create a producer using the namespace connection string and event hub name
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // prepare a batch of events to send to the event hub    
        EventDataBatch batch = producer.createBatch();
        batch.tryAdd(new EventData("First event"));
        batch.tryAdd(new EventData("Second event"));
        batch.tryAdd(new EventData("Third event"));
        batch.tryAdd(new EventData("Fourth event"));
        batch.tryAdd(new EventData("Fifth event"));

        // send the batch of events to the event hub
        producer.send(batch);

        // close the producer
        producer.close();
    }
}

生成程序,并确保没有引发任何错误。Build the program, and ensure that there are no errors. 将在运行接收器程序后运行此程序。You'll run this program after you run the receiver program.

接收事件Receive events

本教程中的代码基于 GitHub 上的 EventProcessorClient 示例,可检查该代码以查看完整的工作应用程序。The code in this tutorial is based on the EventProcessorClient sample on GitHub, which you can examine to see the full working application.

备注

如果在 Azure Stack Hub 上运行,该平台支持的存储 Blob SDK 可能不同于通常在 Azure 上提供的版本。If you are running on Azure Stack Hub, that platform may support a different version of Storage Blob SDK than those typically available on Azure. 例如,如果在 Azure Stack Hub 版本 2002 上运行,则存储服务的最高可用版本为版本 2017-11-09。For example, if you are running on Azure Stack Hub version 2002, the highest available version for the Storage service is version 2017-11-09. 在这种情况下,除了执行本部分中的步骤以外,还需要添加相关代码,将存储服务 API 版本 2017-11-09 作为目标。In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2017-11-09. 如需通过示例来了解如何以特定的存储 API 版本为目标,请参阅 GitHub 上的此示例For an example on how to target a specific Storage API version, see this sample on GitHub. 有关 Azure Stack Hub 上支持的 Azure 存储服务版本的详细信息,请参阅 Azure Stack Hub 存储:差异和注意事项For more information on the Azure Storage service versions supported on Azure Stack Hub, please refer to Azure Stack Hub storage: Differences and considerations.

创建 Azure 存储和 Blob 容器Create an Azure Storage and a blob container

本快速入门将使用 Azure 存储(特别是 Blob 存储)作为检查点存储。In this quickstart, you use Azure Storage (specifically, Blob Storage) as the checkpoint store. 标记检查点是一个进程,被事件处理器用来标记或提交分区中最后一个成功处理的事件的位置。Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. 标记检查点通常在处理事件的函数中进行。Marking a checkpoint is typically done within the function that processes the events. 了解有关检查点的更多信息,请参阅事件处理器To learn more about checkpointing, see Event processor.

按照以下步骤创建 Azure 存储帐户。Follow these steps to create an Azure Storage account.

  1. 创建 Azure 存储帐户Create an Azure Storage account

  2. 创建一个 blob 容器Create a blob container

  3. 获取存储帐户的连接字符串Get the connection string to the storage account

    请记下该连接字符串和容器名称 。Note down the connection string and the container name. 稍后要在接收代码中使用这些信息。You'll use them in the receive code.

将事件中心库添加到 Java 项目Add Event Hubs libraries to your Java project

在 pom.xml 文件中添加以下依赖项。Add the following dependencies in the pom.xml file.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>
  1. 将以下“导入”语句添加到 Java 文件顶部。Add the following import statements at the top of the Java file.

    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import java.util.function.Consumer;
    import java.util.concurrent.TimeUnit;
    
  2. 创建一个名为 Receiver 的类,并向该类中添加以下字符串变量。Create a class named Receiver, and add the following string variables to the class. 将占位符替换为正确的值。Replace the placeholders with the correct values.

    private static final String EH_NAMESPACE_CONNECTION_STRING = "<EVENT HUBS NAMESPACE CONNECTION STRING>";
    private static final String eventHubName = "<EVENT HUB NAME>";
    private static final String STORAGE_CONNECTION_STRING = "<AZURE STORAGE CONNECTION STRING>";
    private static final String STORAGE_CONTAINER_NAME = "<AZURE STORAGE CONTAINER NAME>";
    
  3. 将下面的 main 方法添加到该类。Add the following main method to the class.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING) 
            .containerName(STORAGE_CONTAINER_NAME) 
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName) 
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR) 
            .processError(ERROR_HANDLER) 
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); 
    
        // Use the builder object to create an event processor client 
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }    
    
  4. 将处理事件和错误的两个帮助程序方法(PARTITION_PROCESSORERROR_HANDLER)添加到 Receiver 类中。Add the two helper methods (PARTITION_PROCESSOR and ERROR_HANDLER) that process events and errors to the Receiver class.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n", 
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
    
        if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };    
    
  5. 完整代码应如下所示:The complete code should look like:

    
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import java.util.function.Consumer;
    import java.util.concurrent.TimeUnit;
    
    public class Receiver {
    
        private static final String EH_NAMESPACE_CONNECTION_STRING = "<EVENT HUBS NAMESPACE CONNECTION STRING>";
        private static final String eventHubName = "<EVENT HUB NAME>";
        private static final String STORAGE_CONNECTION_STRING = "<AZURE STORAGE CONNECTION STRING>";
        private static final String STORAGE_CONTAINER_NAME = "<AZURE STORAGE CONTAINER NAME>";
    
        public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n", 
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
    
            if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
                eventContext.updateCheckpoint();
            }
        };
    
        public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
            System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        };
    
        public static void main(String[] args) throws Exception {
            BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(STORAGE_CONNECTION_STRING)
                .containerName(STORAGE_CONTAINER_NAME)
                .buildAsyncClient();
    
            EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                .connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
            EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
            System.out.println("Starting event processor");
            eventProcessorClient.start();
    
            System.out.println("Press enter to stop.");
            System.in.read();
    
            System.out.println("Stopping event processor");
            eventProcessorClient.stop();
            System.out.println("Event processor stopped.");
    
            System.out.println("Exiting process");
        }
    
    }
    
  6. 生成程序,并确保没有引发任何错误。Build the program, and ensure that there are no errors.

运行应用程序Run the applications

  1. 先运行接收器应用程序。Run the receiver application first.
  2. 然后运行发送器应用程序。Then, run the sender application.
  3. 接收器应用程序窗口中,确认已看到发送器应用程序发布的事件。In the receiver application window, confirm that you see the events that were published by the sender application.
  4. 在接收器应用程序窗口中按 ENTER 停止该应用程序。Press ENTER in the receiver application window to stop the application.

后续步骤Next steps

在 GitHub 上参阅以下示例:See the following samples on GitHub: