使用 Java 向/从 Azure 事件中心 (azure-eventhubs) 发送/接收事件Use Java to send events to or receive events from Azure Event Hubs (azure-eventhubs)

本快速入门介绍如何使用 azure-eventhubs Java 包向事件中心发送事件以及从事件中心接收事件。This quickstart shows how to send events to and receive events from an event hub using the azure-eventhubs Java package.

警告

本快速入门使用旧的 azure-eventhubsazure-eventhubs-eph 包。This quickstart uses the old azure-eventhubs and azure-eventhubs-eph packages. 有关使用最新 azure-messaging-eventhubs 包的快速入门,请参阅使用 azure-messaging-eventhubs 发送和接收事件For a quickstart that uses the latest azure-messaging-eventhubs package, see Send and receive events using azure-messaging-eventhubs. 若要将应用程序从使用旧包迁移到使用新包,请参阅从 azure-eventhubs 迁移到 azure-messaging-eventhubs 的指南To move your application from using the old package to new one, see the Guide to migrate from azure-eventhubs to azure-messaging-eventhubs.

先决条件Prerequisites

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述If you are new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

若要完成本快速入门,需要具备以下先决条件:To complete this quickstart, you need the following prerequisites:

  • Azure 订阅Azure subscription. 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。To use Azure services, including Azure Event Hubs, you need a subscription. 如果没有现有 Azure 帐户,可以注册 1 元试用版创建帐户If you don't have an existing Azure account, you can sign up for a 1rmb trial or create an account.
  • Java 开发环境。A Java development environment. 本快速入门使用 EclipseThis quickstart uses Eclipse.
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照文章中的以下说明获取事件中心访问密钥的值:获取连接字符串Then, get the value of access key for the event hub by following instructions from the article: Get connection string. 你将在本快速入门中稍后编写的代码中使用访问密钥。You use the access key in the code you write later in this quickstart. 默认密钥名称为:RootManageSharedAccessKey 。The default key name is: RootManageSharedAccessKey.

发送事件Send events

本部分介绍如何创建一个向事件中心发送事件的 Java 应用程序。This section shows you how to create a Java application to send events an event hub.

备注

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. 或者,也可以按照本快速入门中的步骤创建自己的应用程序。Alternatively, you can follow the steps in this quickstart to create your own.

将引用添加到 Azure 事件中心库Add reference to Azure Event Hubs library

事件中心的 Java 客户端库可用于 Maven 中央存储库中的 Marven 项目。The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository. 可使用 Maven 项目文件中的以下依赖项声明引用此库:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>

对于不同类型的生成环境,可以从 Maven 中央存储库显式获取最新发布的 JAR 文件。For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

对于简单的事件发布服务器,请导入事件中心客户端类的 com.microsoft.azure.eventhubs 包和实用程序类(如与 Azure 服务总线消息传送客户端共享的常见异常)的 com.microsoft.azure.servicebus 包。For a simple event publisher, import the com.microsoft.azure.eventhubs package for the Event Hubs client classes and the com.microsoft.azure.servicebus package for utility classes such as common exceptions that are shared with the Azure Service Bus messaging client.

编写代码以将消息发送到事件中心Write code to send messages to the event hub

对于下面的示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 添加一个名为 SimpleSend 的类,并向该类中添加以下代码:Add a class named SimpleSend, and add the following code to the class:

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class SimpleSend {

    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {
            
            
    }
 }

构造连接字符串Construct connection string

使用 ConnectionStringBuilder 类构造要传递到事件中心客户端实例的连接字符串值。Use the ConnectionStringBuilder class to construct a connection string value to pass to the Event Hubs client instance. 将占位符替换为创建命名空间和事件中心时获取的值:Replace the placeholders with the values you obtained when you created the namespace and event hub:

        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("<EVENTHUB NAMESPACE") 
                .setEventHubName("EVENT HUB")
                .setSasKeyName("RootManageSharedAccessKey")
                .setSasKey("SHARED ACCESS KEY");

编写代码来发送事件Write code to send events

通过将字符串转换为其 UTF-8 字节编码创建单一事件。Create a singular event by transforming a string into its UTF-8 byte encoding. 然后,使用连接字符串创建一个新的事件中心客户端实例并发送该消息:Then, create a new Event Hubs client instance from the connection string and send the message:

        final Gson gson = new GsonBuilder().create();

        // The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
        // This enables the user to segregate their thread pool based on the work load.
        // This pool can then be shared across multiple EventHubClient instances.
        // The following sample uses a single thread executor, as there is only one EventHubClient instance,
        // handling different flavors of ingestion to Event Hubs here.
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);

        // Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
        // It is always a best practice to reuse these instances. The following sample shows this.
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);


        try {
            for (int i = 0; i < 10; i++) {

                String payload = "Message " + Integer.toString(i);
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
                EventData sendEvent = EventData.create(payloadBytes);

                // Send - not tied to any partition
                // Event Hubs service will round-robin the events across all Event Hubs partitions.
                // This is the recommended & most reliable way to send to Event Hubs.
                ehClient.sendSync(sendEvent);
            }

            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }

生成并运行程序,并确保没有引发任何错误。Build and run the program, and ensure that there are no errors.

祝贺!Congratulations! 现在已向事件中心发送消息。You have now sent messages to an event hub.

附录:如何将消息路由到 EventHub 分区Appendix: How messages are routed to EventHub partitions

在使用者检索消息之前,必须先由发布者将消息发布到分区。Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. 当使用 com.microsoft.azure.eventhubs.EventHubClient 对象上的 sendSync() 方法同步将消息发布到事件中心时,可以将消息发送到特定分区或以循环方式分发到所有可用分区,具体取决于 是否指定了分区键。When messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

当指定了表示分区键的字符串时,将对该键进行哈希处理以确定要将事件发送到哪个分区。When a string representing the partition key is specified, the key will be hashed to determine which partition to send the event to.

如果未设置分区键,则消息将循环分发到所有可用分区When the partition key is not set, then messages will round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

// close the client at the end of your program
eventHubClient.closeSync();

接收事件Receive events

本教程中的代码基于 GitHub 上的 EventProcessorSample 代码,可检查该代码以查看完整的工作应用程序。The code in this tutorial is based on the EventProcessorSample code on GitHub, which you can examine to see the full working application.

使用 Java 中的 EventProcessorHost 接收消息Receive messages with EventProcessorHost in Java

EventProcessorHost 是一个 Java 类,通过从事件中心管理持久检查点和并行接收来简化从事件中心接收事件的过程。EventProcessorHost is a Java class that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives from those Event Hubs. 使用 EventProcessorHost,可跨多个接收方拆分事件,即使在不同节点中托管也是如此。Using EventProcessorHost, you can split events across multiple receivers, even when hosted in different nodes. 此示例演示如何为单一接收方使用 EventProcessorHost。This example shows how to use EventProcessorHost for a single receiver.

创建存储帐户Create a storage account

若要使用 EventProcessorHost,必须拥有 [Azure 存储帐户][Azure 存储帐户]:To use EventProcessorHost, you must have an [Azure Storage account][Azure Storage account]:

  1. 登录 Azure 门户,并在屏幕左侧选择“创建资源”。Sign in the Azure portal, and select ** Create a resource** on the left-hand side of the screen.

  2. 选择“存储”,然后选择“存储帐户” 。Select Storage, then select Storage account. 在“创建存储帐户” 窗口中,键入存储帐户的名称。In the Create storage account window, type a name for the storage account. 填写其余字段,选择所需区域,然后选择“创建” 。Complete the rest of the fields, select your desired region, and then select Create.

    在 Azure 门户中创建存储帐户

  3. 选择新创建的存储帐户,然后选择“访问密钥” :Select the newly created storage account, and then select Access Keys:

    在 Azure 门户中获取访问密钥

    将 key1 值复制到临时位置。Copy the key1 value to a temporary location. 本教程后面部分需要使用它。You use it later in this tutorial.

EventProcessor Host 创建一个 Java 项目Create a Java project using the EventProcessor Host

事件中心的 Java 客户端库可用于 Maven 中央存储库中的 Maven 项目,并且可以使用 Maven 项目文件中的以下依赖项声明进行引用:The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository, and can be referenced using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>2.4.0</version>
</dependency>

对于不同类型的生成环境,可以从 Maven 中央存储库显式获取最新发布的 JAR 文件。For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

  1. 对于下面的示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 此类名为 ErrorNotificationHandlerThe class is called ErrorNotificationHandler.

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. 使用以下代码创建名为 EventProcessorSample的新类。Use the following code to create a new class called EventProcessorSample. 将占位符替换为创建事件中心和存储帐户时所使用的值:Replace the placeholders with the values used when you created the event hub and storage account:

    package com.microsoft.azure.eventhubs.samples.eventprocessorsample;
    
    import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
    import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    
    public class EventProcessorSample
    {
        public static void main(String args[]) throws InterruptedException, ExecutionException
        {
            String consumerGroupName = "$Default";
            String namespaceName = "----NamespaceName----";
            String eventHubName = "----EventHubName----";
            String sasKeyName = "----SharedAccessSignatureKeyName----";
            String sasKey = "----SharedAccessSignatureKey----";
            String storageConnectionString = "----AzureStorageConnectionString----";
            String storageContainerName = "----StorageContainerName----";
            String hostNamePrefix = "----HostNamePrefix----";
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                 .setNamespaceName(namespaceName)
                 .setEventHubName(eventHubName)
                 .setSasKeyName(sasKeyName)
                 .setSasKey(sasKey);
    
            EventProcessorHost host = new EventProcessorHost(
                 EventProcessorHost.createHostName(hostNamePrefix),
                 eventHubName,
                 consumerGroupName,
                 eventHubConnectionString.toString(),
                 storageConnectionString,
                 storageContainerName);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
    
            host.registerEventProcessor(EventProcessor.class, options)
            .whenComplete((unused, e) ->
            {
                if (e != null)
                {
                    System.out.println("Failure while registering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                }
            })
            .thenAccept((unused) ->
            {
                System.out.println("Press enter to stop.");
                try 
                {
                    System.in.read();
                }
                catch (Exception e)
                {
                    System.out.println("Keyboard read failed: " + e.toString());
                }
            })
            .thenCompose((unused) ->
            {
                return host.unregisterEventProcessor();
            })
            .exceptionally((e) ->
            {
                System.out.println("Failure while unregistering: " + e.toString());
                if (e.getCause() != null)
                {
                    System.out.println("Inner exception: " + e.getCause().toString());
                }
                return null;
            })
            .get(); // Wait for everything to finish before exiting main!
    
            System.out.println("End of sample");
        }
    
  3. 使用以下代码另外创建一个名为 EventProcessor 的类:Create one more class called EventProcessor, using the following code:

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        // OnOpen is called when a new event processor instance is created by the host. 
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        // OnClose is called when an event processor instance is being shut down. 
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        // onEvents is called when events are received on this partition of the Event Hub. 
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;
    
                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one. 
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                            data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
    

本教程使用了一个 EventProcessorHost 实例。This tutorial uses a single instance of EventProcessorHost. 若要增加吞吐量,建议运行多个 EventProcessorHost 实例,最好是在单独的计算机上运行。To increase throughput, we recommend that you run multiple instances of EventProcessorHost, preferably on separate machines. 这也会提供冗余。It provides redundancy as well. 在那些情况下,为了对接收到的事件进行负载均衡,各个不同实例会自动相互协调。In those cases, the various instances automatically coordinate with each other in order to load balance the received events. 如果希望多个接收方都各自处理 全部 事件,则必须使用 ConsumerGroup 概念。If you want multiple receivers to each process all the events, you must use the ConsumerGroup concept. 从不同计算机中接收事件时,根据部署 EventProcessorHost 实例的计算机(或角色)来指定这些实例的名称可能会很有用。When receiving events from different machines, it might be useful to specify names for EventProcessorHost instances based on the machines (or roles) in which they are deployed.

将消息发布到 EventHubPublishing Messages to EventHub

在使用者检索消息之前,必须先由发布者将消息发布到分区。Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. 值得注意的是,当使用 com.microsoft.azure.eventhubs.EventHubClient 对象上的 sendSync() 方法同步将消息发布到事件中心时,可以将消息发送到特定分区或以循环方式分发到所有可用分区,具体取决于 是否指定了分区键。It is worth noting that when messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

如果指定了表示分区键的字符串,则将对该键进行哈希处理以确定事件将发送到的分区。When a string representing the partition key is specified, the key is hashed to determine which partition to send the event to.

如果未设置分区键,则消息将循环分发到所有可用分区When the partition key is not set, then messages are round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

为 EventProcessorHost (EPH) 实现自定义 CheckpointManagerImplementing a Custom CheckpointManager for EventProcessorHost (EPH)

对于默认实现与你的用例不兼容的情况,此 API 提供了一种机制来实现自定义检查点管理器。The API provides a mechanism to implement your custom checkpoint manager for scenarios where the default implementation is not compatible with your use case.

默认检查点管理器使用 blob 存储,但是,如果你使用自己的实现替代了 EPH 使用的检查点管理器,则可以使用所需的任何存储来为你的检查点管理器实现提供支持。The default checkpoint manager uses blob storage but if you override the checkpoint manager used by EPH with your own implementation, you can use any store you want to back your checkpoint manager implementation.

创建一个类,用于实现 com.microsoft.azure.eventprocessorhost.ICheckpointManagerCreate a class that implements the interface com.microsoft.azure.eventprocessorhost.ICheckpointManager

使用检查点管理器的自定义实现 (com.microsoft.azure.eventprocessorhost.ICheckpointManager)Use your custom implementation of the checkpoint manager (com.microsoft.azure.eventprocessorhost.ICheckpointManager)

在你的实现中,你可以替代默认检查点机制并根据你自己的数据存储(例如 SQL Server、CosmosDB 和 Azure Redis 缓存)实现我们自己的检查点。Within your implementation, you can override the default checkpointing mechanism and implement our own checkpoints based on your own data store (like SQL Server, CosmosDB, and Azure Cache for Redis). 对于用于为检查点管理器实现提供支持的存储,建议使其可供为使用者组处理事件的所有 EPH 实例访问。We recommend that the store used to back your checkpoint manager implementation is accessible to all EPH instances that are processing events for the consumer group.

你可以使用你的环境中提供的任何数据存储。You can use any datastore that is available in your environment.

com.microsoft.azure.eventprocessorhost.EventProcessorHost 类提供了 2 个构造函数,可以使用它们来替代你的 EventProcessorHost 的检查点管理器。The com.microsoft.azure.eventprocessorhost.EventProcessorHost class provides you with two constructors that allow you to override the checkpoint manager for your EventProcessorHost.

后续步骤Next steps

请阅读以下文章:Read the following articles: