使用 Java 从 Azure 事件中心接收事件

事件中心是一个高度可缩放的引入系统,每秒可引入数百万事件,从而使应用程序能够处理和分析连接的设备和应用程序所产生的海量数据。 数据采集到事件中心后,可以使用任何实时分析提供程序或存储群集来转换和存储数据。

有关详细信息,请参阅 事件中心概述

本教程演示如何使用以 Java 编写的控制台应用程序从事件中心接收事件。

先决条件

若要完成本教程,需要具备以下先决条件:

  • Java 开发环境。 对于本教程,我们采用 Eclipse
  • 有效的 Azure 帐户。 如果没有 Azure 订阅,请在开始前创建试用帐户

本教程中的代码基于 GitHub 上的 EventProcessorSample 代码,可检查该代码以查看完整的工作应用程序。

使用 Java 中的 EventProcessorHost 接收消息

EventProcessorHost 是一个 Java 类,通过从事件中心管理持久检查点和并行接收来简化从事件中心接收事件的过程。 使用 EventProcessorHost,可跨多个接收方拆分事件,即使在不同节点中托管也是如此。 此示例演示如何为单一接收方使用 EventProcessorHost。

创建存储帐户

若要使用 EventProcessorHost,必须有一个 Azure 存储帐户

  1. 登录到 Azure 门户,然后单击屏幕左侧的“+创建资源”。
  2. 单击“存储”,并单击“存储帐户”。 在“创建存储帐户”窗口中,键入存储帐户的名称。 填写其余字段,选择所需区域,然后单击“创建”。

  3. 单击新创建的存储帐户,并单击“访问密钥”:

    将 key1 值复制到临时区域,以供本教程后面使用。

EventProcessor Host 创建一个 Java 项目

事件中心的 Java 客户端库可用于 Maven 中央存储库中的 Maven 项目,并且可以使用 Maven 项目文件中的以下依赖项声明进行引用。 当前版本为 1.0.0:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>1.0.0</version>
</dependency>

对于不同类型的生成环境,可以从 Maven 中央存储库显式获取最新发布的 JAR 文件。

  1. 对于下面的示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。 此类名为 ErrorNotificationHandler

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. 使用以下代码创建名为 EventProcessorSample的新类。 将占位符替换为创建事件中心和存储帐户时所使用的值:

    package com.microsoft.azure.eventhubs.samples.eventprocessorsample;
    
    import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
    import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    
    public class EventProcessorSample
    {
        public static void main(String args[]) throws InterruptedException, ExecutionException
        {
               String consumerGroupName = "$Default";
            String namespaceName = "----NamespaceName----";
            String eventHubName = "----EventHubName----";
            String sasKeyName = "----SharedAccessSignatureKeyName----";
            String sasKey = "----SharedAccessSignatureKey----";
            String storageConnectionString = "----AzureStorageConnectionString----";
            String storageContainerName = "----StorageContainerName----";
            String hostNamePrefix = "----HostNamePrefix----";
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                 .setNamespaceName(namespaceName)
                 .setEventHubName(eventHubName)
                 .setSasKeyName(sasKeyName)
                 .setSasKey(sasKey);
    
            EventProcessorHost host = new EventProcessorHost(
                 EventProcessorHost.createHostName(hostNamePrefix),
                 eventHubName,
                 consumerGroupName,
                 eventHubConnectionString.toString(),
                 storageConnectionString,
                 storageContainerName);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
    
            host.registerEventProcessor(EventProcessor.class, options)
            .whenComplete((unused, e) ->
            {
                if (e != null)
                {
                    System.out.println("Failure while registering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                }
            })
            .thenAccept((unused) ->
            {
                System.out.println("Press enter to stop.");
                  try 
                {
                    System.in.read();
                }
                catch (Exception e)
                {
                    System.out.println("Keyboard read failed: " + e.toString());
                }
            })
            .thenCompose((unused) ->
             {
                 return host.unregisterEventProcessor();
            })
            .exceptionally((e) ->
            {
                System.out.println("Failure while unregistering: " + e.toString());
                if (e.getCause() != null)
                {
                    System.out.println("Inner exception: " + e.getCause().toString());
                }
                return null;
            })
            .get(); // Wait for everything to finish before exiting main!
    
            System.out.println("End of sample");
        }
    
  3. 使用以下代码另外创建一个名为 EventProcessor 的类:

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        // OnOpen is called when a new event processor instance is created by the host. In a real implementation, this
        // is the place to do initialization so that events can be processed when they arrive, such as opening a database
        // connection.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        // OnClose is called when an event processor instance is being shut down. The reason argument indicates whether the shut down
        // is because another host has stolen the lease for this partition or due to error or host shutdown. In a real implementation,
        // this is the place to do cleanup for resources that were opened in onOpen.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        // It is NOT called for exceptions thrown out of onOpen/onClose/onEvents. EventProcessorHost is responsible for recovering from
        // the error, if possible, or shutting the event processor down if not, in which case there will be a call to onClose. The
        // notification provided to onError is primarily informational.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        // onEvents is called when events are received on this partition of the Event Hub. The maximum number of events in a batch
        // can be controlled via EventProcessorOptions. Also, if the "invoke processor after receive timeout" option is set to true,
        // this method will be called with null when a receive timeout occurs.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                // It is important to have a try-catch around the processing of each event. Throwing out of onEvents deprives
                // you of the chance to process any remaining events in the batch. 
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;
    
                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one. Checkpointing is usually not a fast operation, so there is a tradeoff
                    // between checkpointing frequently (to minimize the number of events that will be reprocessed after a crash, or
                    // if the partition lease is stolen) and checkpointing infrequently (to reduce the impact on event processing
                    // performance). Checkpointing every five events is an arbitrary choice for this sample.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                               data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
    

Note

本教程使用了一个 EventProcessorHost 实例。 若要增加吞吐量,建议运行多个 EventProcessorHost 实例,最好是在单独的计算机上运行。 这也会提供冗余。 在那些情况下,为了对接收到的事件进行负载均衡,各个不同实例会自动相互协调。 如果希望多个接收方都各自处理 全部 事件,则必须使用 ConsumerGroup 概念。 从不同计算机中接收事件时,根据部署 EventProcessorHost 实例的计算机(或角色)来指定这些实例的名称可能会很有用。

后续步骤

访问以下链接可以了解有关事件中心的详细信息: