使用 Java 向/从 Azure 事件中心发送/接收事件

本快速入门介绍如何使用 azure-messaging-eventhubs Java 包向事件中心发送事件以及从事件中心接收事件。

提示

如果你目前在 Spring 应用程序中使用 Azure 事件中心资源,建议将 Spring Cloud Azure 视为替代方法。 Spring Cloud Azure 是一个开源项目,提供 Spring 与 Azure 服务的无缝集成。 若要详细了解 Spring Cloud Azure,并查看使用事件中心的示例,请参阅使用 Azure 事件中心的 Spring Cloud 流

先决条件

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述

若要完成本快速入门,需要具备以下先决条件:

  • Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果没有现有的 Azure 帐户,可以注册试用订阅帐户,或者在创建帐户时使用 MSDN 订阅者权益。
  • Java 开发环境。 本快速入门使用 Eclipse。 需要 Java 开发工具包 (JDK) 版本 8 或更高版本。
  • 创建事件中心命名空间和事件中心。 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 要创建命名空间和事件中心,请按照此文中的步骤操作。 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串:获取连接字符串。 稍后将在本快速入门中使用连接字符串。

发送事件

本部分介绍如何创建一个向事件中心发送事件的 Java 应用程序。

将引用添加到 Azure 事件中心库

首先,请在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。 按如下所示更新 pom.xml 文件。 Maven 中心存储库中提供了事件中心的 Java 客户端库。

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

注意

将版本更新为发布到 Maven 存储库的最新版本。

向 Azure 验证应用

本快速入门介绍连接到 Azure 事件中心的两种方法:无密码方法和连接字符串方法。 第一个选项展示了如何使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到事件中心命名空间。 无需担心代码、配置文件或安全存储(如 Azure 密钥保管库)中存在硬编码的连接字符串。 第二个选项说明如何使用连接字符串连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。

将角色分配到 Microsoft Entra 用户帐户

在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 你需要拥有 Azure 事件中心数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。

以下示例将 Azure Event Hubs Data Owner 角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。

Azure 事件中心的内置 Azure 角色

对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对事件中心命名空间的访问权限:

如果要创建自定义角色,请参阅执行事件中心操作所需的权限

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。

  2. 在概述页面上,从左侧菜单中选择“访问控制(IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Event Hubs Data Owner 并选择匹配的结果。 然后选择“下一步” 。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

编写代码以将消息发送到事件中心

添加一个名为 Sender 的类,并向该类中添加以下代码:

重要

  • <NAMESPACE NAME> 更新为你的事件中心命名空间的名称。
  • <EVENT HUB NAME> 更新为你的事件中心的名称。
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.chinacloudapi.cn";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.chinacloudapi.cn";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

生成程序,并确保没有引发任何错误。 将在运行接收器程序后运行此程序。

接收事件

本教程中的代码基于 GitHub 上的 EventProcessorClient 示例,可检查该代码以查看完整的工作应用程序。

使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:

  • 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
  • 请勿将容器用于任何其他用途,也不要将存储帐户用于任何其他用途。
  • 存储帐户应位于部署的应用程序所在的同一区域中。 如果应用程序位于本地,请尝试选择最近的区域。

在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。

  • 分层命名空间
  • Blob 软删除
  • 版本控制

创建 Azure 存储和 Blob 容器

本快速入门将使用 Azure 存储(特别是 Blob 存储)作为检查点存储。 标记检查点是一个进程,被事件处理器用来标记或提交分区中最后一个成功处理的事件的位置。 标记检查点通常在处理事件的函数中进行。 了解有关检查点的更多信息,请参阅事件处理器

按照以下步骤创建 Azure 存储帐户。

  1. 创建 Azure 存储帐户
  2. 创建一个 blob 容器
  3. 对 Blob 容器进行身份验证

在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需有“存储 Blob 数据参与者”角色才能读取和写入 Blob 数据。 若要为你自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可以在范围概述页上详细了解角色分配的可用范围。

在此方案中,你将为用户帐户分配权限(范围为存储帐户)以遵循最低权限原则。 这种做法仅为用户提供所需的最低权限,并创建更安全的生产环境。

以下示例将“存储 Blob 数据参与者”角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟的时间,但极少数情况下最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到存储帐户。

  2. 在存储帐户概述页的左侧菜单中选择“访问控制 (IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。

    显示如何分配存储帐户角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 在此示例中,搜索“存储 Blob 数据参与者”并选择匹配的结果,然后选择“下一步”。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

将事件中心库添加到 Java 项目

在 pom.xml 文件中添加以下依赖项。

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. 将以下 import 语句添加到 Java 文件顶部。

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. 创建一个名为 Receiver 的类,并向该类中添加以下字符串变量。 将占位符替换为正确的值。

    重要

    将占位符替换为正确的值。

    • <NAMESPACE NAME> 替换为事件中心命名空间的名称。
    • <EVENT HUB NAME> 更新为命名空间中的事件中心名称。
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.chinacloudapi.cn";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. 将下面的 main 方法添加到该类。

    重要

    将占位符替换为正确的值。

    • <STORAGE ACCOUNT NAME> 更新为你的 Azure 存储帐户的名称。
    • <CONTAINER NAME> 更新为存储帐户中 Blob 容器的名称
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.chinacloudapi.cn")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. 将处理事件和错误的两个帮助程序方法(PARTITION_PROCESSORERROR_HANDLER)添加到 Receiver 类中。

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. 生成程序,并确保没有引发任何错误。

运行应用程序

  1. 先运行接收器应用程序。

  2. 然后运行发送器应用程序。

  3. 在“接收器”应用程序窗口中,确认已看到发送器应用程序发布的事件。

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. 在接收器应用程序窗口中按 ENTER 停止该应用程序。

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

后续步骤

在 GitHub 上参阅以下示例: