本指南提供了详细信息,可帮助你使用 Java 消息服务 2.0 API 与Azure 服务总线通信。
作为 Java 开发人员,如果你不熟悉 Azure 服务总线,请考虑阅读以下文章。
| 入门指南 | 概念 |
|---|---|
Java消息服务 (JMS) 编程模型
以下各节介绍了Java消息服务 API 编程模型:
注释
Azure 服务总线高级层 支持 JMS 1.1 和 JMS 2.0。
Azure 服务总线 - 标准 层支持有限的 JMS 1.1 功能。 有关详细信息,请参阅 本文档。
JMS - 基础构件
使用以下模块与 JMS 应用程序通信。
注释
本指南改编自
若要更好地了解Java消息服务(JMS),请参阅本教程。
连接工厂
注释
该 azure-servicebus-jms 库有两种变体:com.azure:azure-servicebus-jms(2.0.0+ 版本)适用于 雅加达 EE(jakarta.jms.*)和 com.microsoft.azure:azure-servicebus-jms(1.0.x 版本)适用于 Java EE(javax.jms.*)。 有关如何选择正确构件的指南,请参阅 Jakarta EE 和 javax 支持。
客户端使用连接工厂对象连接到 JMS 提供程序。 连接工厂封装管理员定义的一组连接配置参数。
每个连接工厂都是ConnectionFactory、QueueConnectionFactory或TopicConnectionFactory接口的一个实例。
为了简化连接到Azure 服务总线,这些接口分别通过 ServiceBusJmsConnectionFactory、ServiceBusJmsQueueConnectionFactory 或 ServiceBusJmsTopicConnectionFactory 来实现。
重要
使用JMS 2.0 API的Java应用程序可以通过连接字符串或使用TokenCredential来利用由Microsoft Entra支持的身份验证,以连接至Azure服务总线。 使用Microsoft Entra支持的身份验证时,请确保根据需要分配角色和权限。
在 Azure 上创建 系统分配的托管标识 ,并使用此标识创建一个 TokenCredential。
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
可以使用以下参数实例化连接工厂:
- 令牌凭据 - 表示能够提供 OAuth 令牌的凭据。
- 主机 - Azure 服务总线高级层命名空间的主机名。
- ServiceBusJmsConnectionFactorySettings 属性包,其中包含:
-
connectionIdleTimeoutMS- 空闲连接超时(以毫秒为单位)。 -
traceFrames- 用于收集用于调试的 AMQP 跟踪帧的布尔标志。 - 其他配置参数。
-
创建工厂,如以下示例所示。 令牌凭据和主机是必需参数,但其他属性是可选的。
String host = "<YourNamespaceName>.servicebus.chinacloudapi.cn";
ConnectionFactory factory = new ServiceBusJmsConnectionFactory(tokenCredential, host, null);
JMS 目标
目标是客户端用来指定它生成的消息的目标以及它使用的消息源的对象。
目标映射到 Azure 服务总线中的实体 - 队列(点到点方案中)和主题(发布-订阅方案中)。
连接
连接封装与 JMS 提供程序的虚拟连接。 在使用 Azure 服务总线时,它表示应用程序与 Azure 服务总线之间通过 AMQP 的有状态连接。
从连接工厂创建连接,如以下示例所示:
Connection connection = factory.createConnection();
会议
会话是用于生成和使用消息的单线程上下文。 使用它创建消息、消息生成者和使用者。 它还提供事务上下文,以将发送和接收分组为一个原子工作单元。
从连接对象创建会话,如以下示例所示:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
注释
JMS API 不支持从启用了消息会话的服务总线队列或主题接收消息。
会话模式
使用以下任一模式创建会话。
| 会话模式 | 行为 |
|---|---|
| Session.AUTO_ACKNOWLEDGE | 当会话成功从接收调用返回或当会话调用的消息侦听器成功处理并返回时,会话会自动确认客户端收到消息。 |
| Session.CLIENT_ACKNOWLEDGE | 客户端通过调用消息的确认方法来确认已使用的消息。 |
| Session.DUPS_OK_ACKNOWLEDGE | 此确认模式指示会话延迟确认消息送达。 |
| Session.SESSION_TRANSACTED | 将此值作为参数传递给 Connection 对象上的方法 createSession(int sessionMode) ,以指定会话应使用本地事务。 |
如果未指定会话模式,则默认 为Session.AUTO_ACKNOWLEDGE。
JMSContext
注释
JMSContext 定义为 JMS 2.0 规范的一部分。
JMSContext 结合了连接和会话对象提供的功能。 从连接工厂对象创建它。
JMSContext context = connectionFactory.createContext();
JMSContext 模式
与 Session 对象一样,可以使用 与会话模式中提到的相同确认模式创建 JMSContext。
JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
如果未指定模式,则默认 为JMSContext.AUTO_ACKNOWLEDGE。
JMS 消息生成方
消息生成者是使用 JMSContext 或会话创建的对象。 使用它将消息发送到目标。
可以将其创建为独立对象,如以下示例所示:
JMSProducer producer = context.createProducer();
或者,如果需要发送消息,可以在运行时创建它。
context.createProducer().send(destination, message);
JMS 消息使用者
消息使用者是 JMSContext 或会话创建的对象。 使用它接收发送到目标的消息。 按以下示例所示创建它:
JMSConsumer consumer = context.createConsumer(dest);
通过 receive() 方法同步接收
消息消费者通过receive() 方法提供一种从目标接收消息的同步方式。
如果未指定参数或超时时间,或者指定超时时间为 0,那么消费者会一直等待,直到消息到达或者连接中断(以较早发生者为准)。
Message m = consumer.receive();
Message m = consumer.receive(0);
提供非零正参数时,消费者会阻塞,直至该计时器过期。
Message m = consumer.receive(1000); // time out after one second.
使用 JMS 消息侦听器进行异步接收
消息侦听器是一个对象,用于对目标上的消息进行异步处理。 它实现 MessageListener 接口,该接口包含必须存在特定业务逻辑的 onMessage 方法。
您必须实例化一个消息侦听器对象,并使用 setMessageListener 方法将其注册到特定的消息消费者。
Listener myListener = new Listener();
consumer.setMessageListener(myListener);
使用主题
你可以针对一个目的地创建JMS 消息使用者,该目的地可以是一个队列或一个主题。
队列上的使用者只是客户端对象,它们位于客户端应用程序与Azure 服务总线之间的会话(和连接)上下文中。
消费者在讨论主题时,往往会涉及两个方面 -
- 存在于会话(或 JMSContext)上下文中的 客户端对象,
- Azure 服务总线中的订阅实体。
订阅的文档可以在 此处 查阅,并且可以是以下类型之一:
- 共享持久订阅
- 共享非持久订阅
- 非共享持久订阅
- 非共享、非持久的订阅
JMS 队列浏览器
JMS API 提供一个 QueueBrowser 对象,应用程序可以使用该对象浏览队列中的消息并显示每个消息的标头值。
可以使用 JMSContext 创建队列浏览器,如以下示例所示:
QueueBrowser browser = context.createBrowser(queue);
注释
JMS API 不提供用于浏览主题的 API。
存在此限制,因为主题本身不会存储消息。 将消息发送到主题后,它会立即转发到相应的订阅。
JMS 消息选择器
接收应用程序可以使用消息选择器来筛选接收的消息。 通过使用消息选择器,接收应用程序会将筛选消息的工作卸载到 JMS 提供程序(在本例中,Azure 服务总线),而不是自行承担该责任。
创建以下任一使用者时,可以使用选择器:
- 共享持久订阅
- 非共享持久订阅
- 共享的非持久订阅
- 未共享的非持久订阅
- 队列使用者
- 队列浏览器
注释
服务总线选择器不支持 LIKE 和 BETWEEN SQL 关键字。
定时消息(延迟发送)
JMS 2.0 支持使用setDeliveryDelay方法在MessageProducer或JMSProducer上调度消息以供将来传递。 设置此属性时,服务总线接受消息,但仅在延迟期过后才对使用者可见。
MessageProducer producer = session.createProducer(queue);
// Schedule a message for delivery 30 seconds from now
producer.setDeliveryDelay(30000);
producer.send(session.createTextMessage("Scheduled message"));
有关完整的工作示例,请参阅 azure-servicebus-jms-samples 存储库中的 QueueScheduledSend.java。
连接工厂选择和系统弹性
ServiceBusJmsConnectionFactory在 Spring Boot 或其他管理 JMS 连接的框架中使用时,请为发送者和侦听器选择正确的连接工厂包装器,以确保可靠的操作。
建议配置
| 角色 | 连接工厂 | 为什么 |
|---|---|---|
发件人 (JmsTemplate) |
CachingConnectionFactory 包装 ServiceBusJmsConnectionFactory |
JmsTemplate 默认情况下,创建并关闭每个发送的连接。
CachingConnectionFactory 保持单一的 AMQP 连接并缓存会话,从而避免频繁的连接变化,这样可以防止在高负载下耗尽中转站资源。 |
侦听器 (@JmsListener, DefaultMessageListenerContainer) |
原始 ServiceBusJmsConnectionFactory (未封装) |
每个侦听器容器都拥有一个具有独立生命周期的 AMQP 连接。 如果连接失败(令牌过期、网关升级、网络故障),则仅该侦听器受到影响,Spring 会自动重新创建连接。 |
侦听器应避免的事项
Warning
切勿与侦听器容器一起使用 SingleConnectionFactory 。 它强制所有侦听器共享单个 JMS 连接。 如果出于任何原因中断了该连接,则所有侦听器会同时失去连接,并且无法独立恢复。 使用原始 ServiceBusJmsConnectionFactory 容器,以便每个侦听器容器管理自己的连接。
CachingConnectionFactory 在侦听器容器上也可能会导致问题,因为被缓存的会话可能引用过时的底层连接。 对于侦听器,原始工厂可确保每个容器可以独立创建一个新的连接。
Spring Cloud Azure 默认值
如果使用 spring-cloud-azure-starter-servicebus-jms(版本 6.2.0+),启动器默认会应用此工厂分离功能:
spring.jms.servicebus.pool.enabled |
spring.jms.cache.enabled |
发送方工厂 | 侦听器工厂 |
|---|---|---|---|
| (未设置) | (未设置) | CachingConnectionFactory |
ServiceBusJmsConnectionFactory |
| (未设置) | true |
CachingConnectionFactory |
CachingConnectionFactory |
| (未设置) | false |
ServiceBusJmsConnectionFactory |
ServiceBusJmsConnectionFactory |
true |
(未设置) | JmsPoolConnectionFactory |
JmsPoolConnectionFactory |
在旧版本(6.2.0 之前),发送方和侦听器默认都使用 ServiceBusJmsConnectionFactory ,这会导致发送方为每个发送创建新的连接。
添加异常侦听器
如果没有异常侦听器,连接断开时完全没有提示。 向发送方和侦听器工厂添加一个 jakarta.jms.ExceptionListener 以提高可观测性。
connection.setExceptionListener(exception -> {
log.error("JMS connection error: {}", exception.getMessage(), exception);
});
在 Spring Boot 中,设置CachingConnectionFactory的异常侦听器(用于发送方)和DefaultJmsListenerContainerFactory的异常侦听器(用于侦听器)。
有关显示所有这些模式的完整工作示例,请参阅 azure-servicebus-jms-samples 存储库中的 Spring Boot JMS 复原示例。
死信队列
Azure 服务总线中的每个队列和主题订阅都有关联的死信队列。 系统会自动将无法传送或处理的消息转移到死信队列 (DLQ)。 例如,当消息超过最大传递计数或其生存时间(TTL)过期时,系统将消息移动到 DLQ。
重要
若要将 TTL 过期的消息移动到 DLQ,请为队列或订阅启用消息过期时的死信功能。 如果没有此设置,系统会以无提示方式放弃过期的消息。 有关配置,请参阅 为队列或订阅启用死信。
在 JMS 中,将 DLQ 作为单独的目标,通过构造完整路径并使用该路径创建 JmsQueue 来访问。 不需要特殊 API。
队列 DLQ 路径格式:
<queue-name>/$deadletterqueue
主题订阅 DLQ 路径格式:
<topic-name>/Subscriptions/<subscription-name>/$deadletterqueue
示例 - 从队列的死信队列消费:
import org.apache.qpid.jms.JmsQueue;
// Construct the DLQ path for a queue named "orders"
String dlqPath = "orders/$deadletterqueue";
JmsQueue dlqDestination = new JmsQueue(dlqPath);
// Create a consumer on the DLQ and receive messages
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
Message message = dlqConsumer.receive(5000);
死信消息包括元数据属性,用于描述消息死信的原因:
| 财产 | Description |
|---|---|
DeadLetterReason |
消息是死信的原因(例如, TTLExpiredException 或 MaxDeliveryCountExceeded)。 |
DeadLetterErrorDescription |
死信原因的人类可读说明。 |
通过使用 message.getStringProperty() 读取这些属性:
String reason = message.getStringProperty("DeadLetterReason");
String description = message.getStringProperty("DeadLetterErrorDescription");
AMQP 处置和服务总线操作映射
以下是将 AMQP 处置转换为服务总线操作的方法:
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
概要
本开发人员指南展示了使用 Java 消息服务 (JMS) 的 Java 客户端应用程序如何连接到 Azure 服务总线。
后续步骤
有关Azure 服务总线和Java 消息服务(JMS)实体的详细信息,请参阅以下文章:*在Azure 服务总线上选择使用JMS还是原生SDK