向 Azure 服务总线队列发送消息并从中接收消息 (Java)
在本快速入门中,你将创建一个 Java 应用,以便向 Azure 服务总线队列发送消息,并从中接收消息。
注意
本快速入门分步介绍了一个简单方案,也就是将消息发送到服务总线队列并接收这些消息。 可在 GitHub 上的 Azure SDK for Java 存储库中找到预生成的 Azure 服务总线 Java 示例。
先决条件
- Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 你可以激活 MSDN 订阅者权益或注册试用版订阅。
- 如果没有可使用的队列,请遵循使用 Azure 门户创建服务总线队列一文来创建队列。 记下服务总线命名空间的连接字符串以及创建的队列的名称 。
- 安装 Azure SDK for Java。 如果使用 Eclipse,则可安装 Azure Toolkit for Eclipse,其中包含用于 Java 的 Azure SDK。 然后,可将用于 Java 的 Azure 库添加到项目。 如果使用 IntelliJ,请参阅安装 Azure Toolkit for IntelliJ。
向队列发送消息
在本部分中,你将创建一个 Java 控制台项目,并添加代码以将消息发送到之前创建的队列。
创建 Java 控制台项目
使用 Eclipse 或所选工具创建 Java 项目。
配置应用程序以使用服务总线
添加对 Azure Core 和 Azure 服务总线库的引用。
如果使用的是 Eclipse 并创建了 Java 控制台应用程序,请将 Java 项目转换为 Maven:在“包资源管理器”窗口中右键单击该项目,然后选择“配置”->“转换为 Maven 项目”。 然后,将依赖项添加到这两个库,如以下示例所示。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.sbusquickstarts</groupId>
<artifactId>sbustopicqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>15</release>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.2</version>
</dependency>
</dependencies>
</project>
添加将消息发送到队列的代码
将以下
import
语句添加到 Java 文件的主题中。import com.azure.messaging.servicebus.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;
在类中,定义用于保存连接字符串和队列名称的变量,如下所示:
static String connectionString = "<NAMESPACE CONNECTION STRING>"; static String queueName = "<QUEUE NAME>";
将
<NAMESPACE CONNECTION STRING>
替换为服务总线命名空间的连接字符串。 并将<QUEUE NAME>
替换为该队列的名称。在类中添加一个名为
sendMessage
的方法,以向队列发送一条消息。static void sendMessage() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }
在类中添加一个名为
createMessages
的方法,以创建消息列表。 通常,可以从应用程序的不同部分获得这些消息。 在这里,我们将创建一个示例消息列表。static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
添加一个名为
sendMessageBatch
方法的方法,以将消息发送到你创建的队列。 此方法为队列创建ServiceBusSenderClient
调用createMessages
方法来获取消息列表,准备一个或多个批处理,并将批处理发送到队列。
static void sendMessageBatch()
{
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.queueName(queueName)
.buildClient();
// Creates an ServiceBusMessageBatch where the ServiceBus.
ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
// create a list of messages
List<ServiceBusMessage> listOfMessages = createMessages();
// We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
// the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
// messages are sent.
for (ServiceBusMessage message : listOfMessages) {
if (messageBatch.tryAddMessage(message)) {
continue;
}
// The batch is full, so we create a new batch and send the batch.
senderClient.sendMessages(messageBatch);
System.out.println("Sent a batch of messages to the queue: " + queueName);
// create a new batch
messageBatch = senderClient.createMessageBatch();
// Add that message that we couldn't before.
if (!messageBatch.tryAddMessage(message)) {
System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
}
}
if (messageBatch.getCount() > 0) {
senderClient.sendMessages(messageBatch);
System.out.println("Sent a batch of messages to the queue: " + queueName);
}
//close the client
senderClient.close();
}
从队列接收消息
在本部分中,你将添加从队列检索消息的代码。
添加名为
receiveMessages
的方法,以从队列检索消息。 此方法通过指定用于处理消息的处理程序和用于处理错误的另一个处理程序来为队列创建ServiceBusProcessorClient
。 然后,它将启动处理器,等待几秒钟,输出接收的消息,然后停止和关闭处理器。重要
将代码中
QueueTest::processMessage
中的QueueTest
替换为你的类的名称。// handles received messages static void receiveMessages() throws InterruptedException { CountDownLatch countdownLatch = new CountDownLatch(1); // create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(QueueTest::processMessage) .processError(context -> processError(context, countdownLatch)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
添加
processMessage
方法以处理从服务总线订阅接收的消息。private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
添加
processError
方法以处理错误消息。private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
更新
main
方法以调用sendMessage
、sendMessageBatch
和receiveMessages
方法,并引发InterruptedException
。public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
运行应用
运行应用程序时,会在控制台窗口中看到以下消息。
Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
Stopping and closing the processor
在 Azure 门户中的服务总线命名空间的“概述”页上,可看到传入和传出消息计数 。 可能需要等待一分钟左右,然后刷新页面才会看到最新值。
在此“概述”页上选择队列,导航到“服务总线队列”页面 。 还可在此页上看到传入和传出消息计数 。 还可看到其他信息,如队列的当前大小、最大大小和活动消息计数等 。
后续步骤
请参阅以下文档和示例: