使用 Java 将事件发送到 Azure 事件中心或从其接收事件Send events to or receive events from Azure Event Hubs using Java

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. 有关事件中心的详细概述,请参阅事件中心概述事件中心功能For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

本教程介绍了如何创建 Java 应用程序来将事件发送到事件中心或从其接收事件。This tutorial shows how to create Java applications to send events to or receive events from an event hub.

Note

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. 或者,可以按照本教程中的步骤创建自己的解决方案。Alternatively, you can follow the steps in this tutorial to create your own.

先决条件Prerequisites

若要完成本教程,需要满足以下先决条件:To complete this tutorial, you need the following prerequisites:

  • 有效的 Azure 帐户。An active Azure account. 如果没有 Azure 订阅,请在开始前创建试用帐户If you do not have an Azure subscription, create a trial account before you begin.
  • Java 开发环境。A Java development environment. 本教程使用 EclipseThis tutorial uses Eclipse.
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照文章中的以下说明获取事件中心访问密钥的值:获取连接字符串Then, get the value of access key for the event hub by following instructions from the article: Get connection string. 可在本教程后面编写的代码中使用该访问密钥。You use the access key in the code you write later in this tutorial. 默认密钥名称为:RootManageSharedAccessKey。The default key name is: RootManageSharedAccessKey.

发送事件Send events

本部分展示了如何创建 Java 应用程序来将事件发送到事件中心。This section shows you how to create a Java application to send events an event hub.

将引用添加到 Azure 事件中心库Add reference to Azure Event Hubs library

事件中心的 Java 客户端库可用于 Maven 中央存储库中的 Marven 项目。The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository. 可使用 Maven 项目文件中的以下依赖项声明引用此库:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>

对于不同类型的生成环境,可以从 Maven 中央存储库显式获取最新发布的 JAR 文件。For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

对于简单的事件发布服务器,请导入事件中心客户端类的 com.microsoft.azure.eventhubs 包和实用程序类(如与 Azure 服务总线消息传送客户端共享的常见异常)的 com.microsoft.azure.servicebus 包。For a simple event publisher, import the com.microsoft.azure.eventhubs package for the Event Hubs client classes and the com.microsoft.azure.servicebus package for utility classes such as common exceptions that are shared with the Azure Service Bus messaging client.

编写代码以将消息发送到事件中心Write code to send messages to the event hub

对于下面的示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 添加一个名为 SimpleSend 的类,并向该类中添加以下代码:Add a class named SimpleSend, and add the following code to the class:

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class SimpleSend {

    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {
            
            
    }
 }

构造连接字符串Construct connection string

使用 ConnectionStringBuilder 类构造要传递到事件中心客户端实例的连接字符串值。Use the ConnectionStringBuilder class to construct a connection string value to pass to the Event Hubs client instance. 将占位符替换为创建命名空间和事件中心时获取的值:Replace the placeholders with the values you obtained when you created the namespace and event hub:

        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("speventhubns") 
                .setEventHubName("speventhub")
                .setSasKeyName("RootManageSharedAccessKey")
                .setSasKey("2+WMsyyy1XmUtEnRsfOmTTyGasfJgsVjGAOIN20J1Y8=");

编写代码来发送事件Write code to send events

通过将字符串转换为其 UTF-8 字节编码创建单一事件。Create a singular event by transforming a string into its UTF-8 byte encoding. 然后,使用连接字符串创建一个新的事件中心客户端实例并发送该消息:Then, create a new Event Hubs client instance from the connection string and send the message:

        final Gson gson = new GsonBuilder().create();

        // The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
        // This enables the user to segregate their thread pool based on the work load.
        // This pool can then be shared across multiple EventHubClient instances.
        // The following sample uses a single thread executor, as there is only one EventHubClient instance,
        // handling different flavors of ingestion to Event Hubs here.
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);

        // Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
        // It is always a best practice to reuse these instances. The following sample shows this.
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);


        try {
            for (int i = 0; i < 10; i++) {

                String payload = "Message " + Integer.toString(i);
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
                EventData sendEvent = EventData.create(payloadBytes);

                // Send - not tied to any partition
                // Event Hubs service will round-robin the events across all Event Hubs partitions.
                // This is the recommended & most reliable way to send to Event Hubs.
                ehClient.sendSync(sendEvent);
            }

            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }

生成并运行程序,并确保没有引发任何错误。Build and run the program, and ensure that there are no errors.

祝贺!Congratulations! 现在已向事件中心发送消息。You have now sent messages to an event hub.

附录:如何将消息路由到 EventHub 分区Appendix: How messages are routed to EventHub partitions

在使用者检索消息之前,必须先由发布者将消息发布到分区。Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. 当使用 com.microsoft.azure.eventhubs.EventHubClient 对象上的 sendSync() 方法同步将消息发布到事件中心时,可以将消息发送到特定分区或以循环方式分发到所有可用分区,具体取决于 是否指定了分区键。When messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

指定了表示分区键的字符串时,将对该键进行哈希处理以确定将事件发送到哪个分区。When a string representing the partition key is specified, the key will be hashed to determine which partition to send the event to.

如果未设置分区键,则消息将循环分发到所有可用分区When the partition key is not set, then messages will round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

// close the client at the end of your program
eventHubClient.closeSync();

接收事件Receive events

本教程中的代码基于 GitHub 上的 EventProcessorSample 代码,可检查该代码以查看完整的工作应用程序。The code in this tutorial is based on the EventProcessorSample code on GitHub, which you can examine to see the full working application.

使用 Java 中的 EventProcessorHost 接收消息Receive messages with EventProcessorHost in Java

EventProcessorHost 是一个 Java 类,通过从事件中心管理持久检查点和并行接收来简化从事件中心接收事件的过程。EventProcessorHost is a Java class that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives from those Event Hubs. 使用 EventProcessorHost,可跨多个接收方拆分事件,即使在不同节点中托管也是如此。Using EventProcessorHost, you can split events across multiple receivers, even when hosted in different nodes. 此示例演示如何为单一接收方使用 EventProcessorHost。This example shows how to use EventProcessorHost for a single receiver.

创建存储帐户Create a storage account

若要使用 EventProcessorHost,必须拥有 [Azure 存储帐户][Azure 存储帐户]:To use EventProcessorHost, you must have an [Azure Storage account][Azure Storage account]:

  1. 登录到 Azure 门户,然后单击屏幕左侧的“+创建资源”。Sign in the Azure portal, and click + Create a resource on the left-hand side of the screen.

  2. 单击“存储”,并单击“存储帐户”。Click Storage, then click Storage account. 在“创建存储帐户”窗口中,键入存储帐户的名称。In the Create storage account window, type a name for the storage account. 填写其余字段,选择所需区域,然后单击“创建”。Complete the rest of the fields, select your desired region, and then click Create.

    创建存储帐户

  3. 单击新创建的存储帐户,并单击“访问密钥”:Click the newly created storage account, and then click Access Keys:

    获取访问密钥

    将 key1 值复制到临时位置。Copy the key1 value to a temporary location. 本教程后面部分需要使用它。You use it later in this tutorial.

EventProcessor Host 创建一个 Java 项目Create a Java project using the EventProcessor Host

事件中心的 Java 客户端库可用于 Maven 中央存储库中的 Maven 项目,并且可以使用 Maven 项目文件中的以下依赖项声明进行引用:The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository, and can be referenced using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>2.4.0</version>
</dependency>

对于不同类型的生成环境,可以从 Maven 中央存储库 [https://search.maven.org/#search%7Cga%7C1%7Ca%3A%22azure-eventhubs-eph%22] 显式获取最新发布的 JAR 文件。For different types of build environments, you can explicitly obtain the latest released JAR files from the [Maven Central Repository][https://search.maven.org/#search%7Cga%7C1%7Ca%3A%22azure-eventhubs-eph%22].

  1. 对于下面的示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 此类名为 ErrorNotificationHandlerThe class is called ErrorNotificationHandler.

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. 使用以下代码创建名为 EventProcessorSample的新类。Use the following code to create a new class called EventProcessorSample. 将占位符替换为创建事件中心和存储帐户时所使用的值:Replace the placeholders with the values used when you created the event hub and storage account:

    package com.microsoft.azure.eventhubs.samples.eventprocessorsample;
    
    import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
    import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    
    public class EventProcessorSample
    {
        public static void main(String args[]) throws InterruptedException, ExecutionException
        {
            String consumerGroupName = "$Default";
            String namespaceName = "----NamespaceName----";
            String eventHubName = "----EventHubName----";
            String sasKeyName = "----SharedAccessSignatureKeyName----";
            String sasKey = "----SharedAccessSignatureKey----";
            String storageConnectionString = "----AzureStorageConnectionString----";
            String storageContainerName = "----StorageContainerName----";
            String hostNamePrefix = "----HostNamePrefix----";
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                 .setNamespaceName(namespaceName)
                 .setEventHubName(eventHubName)
                 .setSasKeyName(sasKeyName)
                 .setSasKey(sasKey);
    
            EventProcessorHost host = new EventProcessorHost(
                 EventProcessorHost.createHostName(hostNamePrefix),
                 eventHubName,
                 consumerGroupName,
                 eventHubConnectionString.toString(),
                 storageConnectionString,
                 storageContainerName);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
    
            host.registerEventProcessor(EventProcessor.class, options)
            .whenComplete((unused, e) ->
            {
                if (e != null)
                {
                    System.out.println("Failure while registering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                }
            })
            .thenAccept((unused) ->
            {
                System.out.println("Press enter to stop.");
                try 
                {
                    System.in.read();
                }
                catch (Exception e)
                {
                    System.out.println("Keyboard read failed: " + e.toString());
                }
            })
            .thenCompose((unused) ->
            {
                return host.unregisterEventProcessor();
            })
            .exceptionally((e) ->
            {
                System.out.println("Failure while unregistering: " + e.toString());
                if (e.getCause() != null)
                {
                    System.out.println("Inner exception: " + e.getCause().toString());
                }
                return null;
            })
            .get(); // Wait for everything to finish before exiting main!
    
            System.out.println("End of sample");
        }
    
  3. 使用以下代码另外创建一个名为 EventProcessor 的类:Create one more class called EventProcessor, using the following code:

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        // OnOpen is called when a new event processor instance is created by the host. 
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        // OnClose is called when an event processor instance is being shut down. 
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        // onEvents is called when events are received on this partition of the Event Hub. 
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;
    
                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one. 
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                            data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
    

本教程使用了一个 EventProcessorHost 实例。This tutorial uses a single instance of EventProcessorHost. 若要增加吞吐量,建议运行多个 EventProcessorHost 实例,最好是在单独的计算机上运行。To increase throughput, we recommend that you run multiple instances of EventProcessorHost, preferably on separate machines. 这也会提供冗余。It provides redundancy as well. 在那些情况下,为了对接收到的事件进行负载均衡,各个不同实例会自动相互协调。In those cases, the various instances automatically coordinate with each other in order to load balance the received events. 如果希望多个接收方都各自处理 全部 事件,则必须使用 ConsumerGroup 概念。If you want multiple receivers to each process all the events, you must use the ConsumerGroup concept. 从不同计算机中接收事件时,根据部署 EventProcessorHost 实例的计算机(或角色)来指定这些实例的名称可能会很有用。When receiving events from different machines, it might be useful to specify names for EventProcessorHost instances based on the machines (or roles) in which they are deployed.

将消息发布到 EventHubPublishing Messages to EventHub

在使用者检索消息之前,必须先由发布者将消息发布到分区。Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. 值得注意的是,当使用 com.microsoft.azure.eventhubs.EventHubClient 对象上的 sendSync() 方法同步将消息发布到事件中心时,可以将消息发送到特定分区或以循环方式分发到所有可用分区,具体取决于 是否指定了分区键。It is worth noting that when messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

如果指定了表示分区键的字符串,则将对该键进行哈希处理以确定事件将发送到的分区。When a string representing the partition key is specified, the key is hashed to determine which partition to send the event to.

如果未设置分区键,则消息将循环分发到所有可用分区When the partition key is not set, then messages are round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

为 EventProcessorHost (EPH) 实现自定义 CheckpointManagerImplementing a Custom CheckpointManager for EventProcessorHost (EPH)

对于默认实现与你的用例不兼容的情况,此 API 提供了一种机制来实现自定义检查点管理器。The API provides a mechanism to implement your custom checkpoint manager for scenarios where the default implementation is not compatible with your use case.

默认检查点管理器使用 blob 存储,但是,如果你使用自己的实现替代了 EPH 使用的检查点管理器,则可以使用所需的任何存储来为你的检查点管理器实现提供支持。The default checkpoint manager uses blob storage but if you override the checkpoint manager used by EPH with your own implementation, you can use any store you want to back your checkpoint manager implementation.

创建一个类,用于实现 com.microsoft.azure.eventprocessorhost.ICheckpointManagerCreate a class that implements the interface com.microsoft.azure.eventprocessorhost.ICheckpointManager

使用检查点管理器的自定义实现 (com.microsoft.azure.eventprocessorhost.ICheckpointManager)Use your custom implementation of the checkpoint manager (com.microsoft.azure.eventprocessorhost.ICheckpointManager)

在你的实现中,你可以替代默认检查点机制并根据你自己的数据存储(例如 SQL Server、CosmosDB 和 Azure Redis 缓存)实现我们自己的检查点。Within your implementation, you can override the default checkpointing mechanism and implement our own checkpoints based on your own data store (like SQL Server, CosmosDB, and Azure Cache for Redis). 对于用于为检查点管理器实现提供支持的存储,建议使其可供为使用者组处理事件的所有 EPH 实例访问。We recommend that the store used to back your checkpoint manager implementation is accessible to all EPH instances that are processing events for the consumer group.

你可以使用你的环境中提供的任何数据存储。You can use any datastore that is available in your environment.

com.microsoft.azure.eventprocessorhost.EventProcessorHost 类提供了 2 个构造函数,可以使用它们来替代你的 EventProcessorHost 的检查点管理器。The com.microsoft.azure.eventprocessorhost.EventProcessorHost class provides you with two constructors that allow you to override the checkpoint manager for your EventProcessorHost.

后续步骤Next steps

请阅读以下文章:Read the following articles: