向 Azure 服务总线队列发送消息并从中接收消息 (Java)Send messages to and receive messages from Azure Service Bus queues (Java)

在本快速入门中,你将创建一个 Java 应用,以便向 Azure 服务总线队列发送消息,并从中接收消息。In this quickstart, you'll create a Java app to send messages to and receive messages from an Azure Service Bus queue.

重要

本快速入门使用新的 azure-messaging-servicebus 包。This quickstart uses the new azure-messaging-servicebus package. 有关使用旧 azure-servicebus 包的快速入门,请参阅使用 azure-servicebus 发送和接收消息For a quickstart that uses the old azure-servicebus package, see Send and receive messages using azure-servicebus.

先决条件Prerequisites

向队列发送消息Send messages to a queue

在本部分中,你将创建一个 Java 控制台项目,并添加代码以将消息发送到之前创建的队列。In this section, you'll create a Java console project, and add code to send messages to the queue that you created earlier.

创建 Java 控制台项目Create a Java console project

使用 Eclipse 或所选工具创建 Java 项目。Create a Java project using Eclipse or a tool of your choice.

配置应用程序以使用服务总线Configure your application to use Service Bus

添加对 Azure 服务总线库的引用。Add a reference to Azure Service Bus library. Maven 中心存储库中提供了服务总线的 Java 客户端库。The Java client library for Service Bus is available in the Maven Central Repository. 可使用 Maven 项目文件中的以下依赖项声明引用此库:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.0.0</version>
</dependency>

添加将消息发送到队列的代码Add code to send messages to the queue

  1. 将以下 import 语句添加到 Java 文件的主题中。Add the following import statements at the topic of the Java file.

    import com.azure.messaging.servicebus.*;
    import com.azure.messaging.servicebus.models.*;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在类中,定义用于保存连接字符串和队列名称的变量,如下所示:In the class, define variables to hold connection string and queue name as shown below:

    static String connectionString = "<NAMESPACE CONNECTION STRING>";
    static String queueName = "<QUEUE NAME>";    
    

    <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串。Replace <NAMESPACE CONNECTION STRING> with the connection string to your Service Bus namespace. 并将 <QUEUE NAME> 替换为该队列的名称。And, replace <QUEUE NAME> with the name of the queue.

  3. 在类中添加一个名为 sendMessage 的方法,以向队列发送一条消息。Add a method named sendMessage in the class to send one message to the queue.

    static void sendMessage()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);        
    }
    
  4. 在类中添加一个名为 createMessages 的方法,以创建消息列表。Add a method named createMessages in the class to create a list of messages. 通常,可以从应用程序的不同部分获得这些消息。Typically, you get these messages from different parts of your application. 在这里,我们将创建一个示例消息列表。Here, we create a list of sample messages.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 添加一个名为 sendMessageBatch 方法的方法,以将消息发送到你创建的队列。Add a method named sendMessageBatch method to send messages to the queue you created. 此方法为队列创建 ServiceBusSenderClient调用 createMessages 方法来获取消息列表,准备一个或多个批处理,并将批处理发送到队列。This method creates a ServiceBusSenderClient for the queue, invokes the createMessages method to get the list of messages, prepares one or more batches, and sends the batches to the queue.

    static void sendMessageBatch()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .queueName(queueName)
                .buildClient();

        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();        

        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();

        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.        
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }

            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);

            // create a new batch
            messageBatch = senderClient.createMessageBatch();

            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }

        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }

        //close the client
        senderClient.close();
    }

从队列接收消息Receive messages from a queue

在本部分中,你将添加从队列检索消息的代码。In this section, you'll add code to retrieve messages from the queue.

  1. 添加名为 receiveMessages 的方法,以从队列检索消息。Add a method named receiveMessages to receive messages from the queue. 此方法通过指定用于处理消息的处理程序和用于处理错误的另一个处理程序来为队列创建 ServiceBusProcessorClientThis method creates a ServiceBusProcessorClient for the queue by specifying a handler for processing messages and another one for handling errors. 然后,它将启动处理器,等待几秒钟,输出接收的消息,然后停止和关闭处理器。Then, it starts the processor, waits for few seconds, prints the messages that are received, and then stops and closes the processor.

    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        // consumer that processes a single message received from Service Bus
        Consumer<ServiceBusReceivedMessageContext> messageProcessor = context -> {
            ServiceBusReceivedMessage message = context.getMessage();
            System.out.println("Received message: " + message.getBody().toString());
        };
    
        // handles any errors that occur when receiving messages
        Consumer<Throwable> errorHandler = throwable -> {
            System.out.println("Error when receiving messages: " + throwable.getMessage());
            if (throwable instanceof ServiceBusReceiverException) {
                ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable;
                System.out.println("Error source: " + serviceBusReceiverException.getErrorSource());
            }
        };
    
        // create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .processor()
            .queueName(queueName)
            .processMessage(messageProcessor)
            .processError(errorHandler)
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();        
    }    
    
  2. 更新 main 方法以调用 sendMessagesendMessageBatchreceiveMessages 方法,并引发 InterruptedExceptionUpdate the main method to invoke sendMessage, sendMessageBatch, and receiveMessages methods and to throw InterruptedException.

    public static void main(String[] args) throws InterruptedException {        
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }   
    

运行应用Run the app

运行应用程序时,会在控制台窗口中看到以下消息。When you run the application, you see the following messages in the console window.

Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Received message: Hello, World!
Received message: First message in the batch
Received message: Second message in the batch
Received message: Three message in the batch
Stopping and closing the processor

在 Azure 门户中的服务总线命名空间的“概述”页上,可看到传入和传出消息计数 。On the Overview page for the Service Bus namespace in the Azure portal, you can see incoming and outgoing message count. 可能需要等待一分钟左右,然后刷新页面才会看到最新值。You may need to wait for a minute or so and then refresh the page to see the latest values.

传入和传出消息计数

在此“概述”页上选择队列,导航到“服务总线队列”页面 。Select the queue on this Overview page to navigate to the Service Bus Queue page. 还可在此页上看到传入和传出消息计数 。You see the incoming and outgoing message count on this page too. 还可看到其他信息,如队列的当前大小、最大大小和活动消息计数等 。You also see other information such as the current size of the queue, maximum size, active message count, and so on.

队列详细信息

后续步骤Next Steps

请参阅以下文档和示例:See the following documentation and samples: