Azure 服务总线 JMS 2.0 开发人员指南

本指南包含的详细信息可帮助你成功使用 Java 消息服务 (JMS) 2.0 API 与 Azure 服务总线进行通信。

作为 Java 开发人员,如果不熟悉 Azure 服务总线,请考虑阅读以下文章。

入门 概念

Java 消息服务 (JMS) 编程模型

Java 消息服务 API 编程模型如以下部分所示:

注意

Azure 服务总线高级层支持 JMS 1.1 和 JMS 2.0。

Azure 服务总线 - 标准层支持有限 JMS 1.1 功能。 有关详细信息,请参阅此文档

JMS - 构建基块

以下构建基块可用于与 JMS 应用程序进行通信。

注意

以下指南从 Oracle Java EE 6 Java 消息服务 (JMS) 教程改写而来

建议参考本教程,更好地了解 Java 消息服务 (JMS)。

连接工厂

客户端使用连接工厂对象与 JMS 提供程序连接。 连接工厂用于封装由管理员定义的一组连接配置参数。

每个连接工厂都是 ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory 接口的一个实例。

为了简化与 Azure 服务总线的连接,这些接口将分别通过 ServiceBusJmsConnectionFactoryServiceBusJmsQueueConnectionFactoryServiceBusJmsTopicConnectionFactory 来实现。

重要

利用 JMS 2.0 API 的 Java 应用程序可以使用连接字符串连接到Azure 服务总线,也可以使用 TokenCredential 来利用 Microsoft Entra 支持的身份验证。 使用 Microsoft Entra 支持的身份验证时,请确保根据需要为标识分配角色和权限

在 Azure 上创建系统分配的托管标识,并使用此标识创建 TokenCredential

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

然后可以使用以下参数实例化连接工厂。

  • 令牌凭据 - 表示能够提供 OAuth 令牌的凭据。
  • 主机 - Azure 服务总线高级层命名空间的主机名。
  • ServiceBusJmsConnectionFactorySettings 属性包,其中包含
    • connectionIdleTimeoutMS - 空闲连接超时(以毫秒为单位)。
    • traceFrames - 收集 AMQP 跟踪帧用于调试的布尔型标志。
    • 其他配置参数

可以按如下所示创建工厂。 令牌凭据和主机是必需参数,但其他属性是可选的。

String host = "<YourNamespaceName>.servicebus.chinacloudapi.cn";
ConnectionFactory factory = new ServiceBusJmsConnectionFactory(tokenCredential, host, null); 

JMS 目标

“目标”是客户端用来指定其生成的消息目标以及其使用的消息源的对象。

目标映射到 Azure 服务总线中的实体 - 队列(点到点方案中)和主题(发布-订阅方案中)。

连接

连接封装与 JMS 提供程序的虚拟连接。 对于 Azure 服务总线,它表示应用程序与 Azure 服务总线之间通过 AMQP 建立的有状态连接。

连接时从连接工厂创建的,如以下示例所示:

Connection connection = factory.createConnection();

会话

会话是一种用于生成和使用消息的单线程上下文。 它可以用来创建消息、消息生成方和使用者,但它还提供事务上下文以允许将发送和接收分组到一个原子工作单元中。

可以从连接对象创建会话,如以下示例所示:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

注意

JMS API 不支持从启用了消息传递会话的服务总线队列或主题接收消息。

会话模式

可以使用以下任一模式创建会话。

会话模式 行为
Session.AUTO_ACKNOWLEDGE 当会话从接收调用成功返回,或当会话调用以处理消息的消息侦听器成功返回时,会话会自动确认客户端接收到消息。
Session.CLIENT_ACKNOWLEDGE 客户端通过调用消息的确认方法来确认已使用的消息。
Session.DUPS_OK_ACKNOWLEDGE 此确认模式指示会话延迟确认消息送达。
Session.SESSION_TRANSACTED 此值可以作为参数传递给连接对象上的方法 createSession (int sessionMode),以指定会话应使用本地事务。

如果未指定会话模式,则默认选取“Session.AUTO_ACKNOWLEDGE”。

JMSContext

注意

JMSContext 定义为 JMS 2.0 规范的一部分。

JMSContext 合并连接对象和会话对象提供的功能。 可以从连接工厂对象创建它。

JMSContext context = connectionFactory.createContext();

JMSContext 模式

与会话对象一样,可以使用会话模式中提到的同一确认模式来创建 JMSContext。

JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);

如果未指定模式,则默认选取“JMSContext.AUTO_ACKNOWLEDGE”。

JMS 消息生成方

消息生成方是使用 JMSContext 或会话创建的对象,用于将消息发送到目标。

可以将它创建为独立对象,如以下示例所示:

JMSProducer producer = context.createProducer();

也可以在需要发送消息是在运行时创建。

context.createProducer().send(destination, message);

JMS 消息使用者

消息使用者是由 JMSContext 或会话创建的对象,用于接收发送到目标的消息。 可以按以下示例所示创建它:

JMSConsumer consumer = context.createConsumer(dest);

通过 receive() 方法进行同步接收

消息使用者通过 receive() 方法以同步方式从目标接收消息。

如果未指定参数/超时,或指定超时值为“0”,则除非消息到达或连接断开(取两者之中较早发生的情况),会无限期阻止使用者。

Message m = consumer.receive();
Message m = consumer.receive(0);

如果提供非零正参数,则计时器过期之前会一直阻止使用者。

Message m = consumer.receive(1000); // time out after one second.

使用 JMS 消息侦听器进行异步接收

消息侦听器是用于对目标上的消息进行异步处理的对象。 它会实现 MessageListener 接口,其中包含必须存在特定业务逻辑的 onMessage 方法。

必须使用 setMessageListener 方法将消息侦听器对象实例化并针对特定消息使用者进行注册。

Listener myListener = new Listener();
consumer.setMessageListener(myListener);

使用主题

JMS 消息使用者是根据目标创建的,而目标可能是一个队列或一个主题。

队列使用者只是客户端应用程序与 Azure 服务总线之间的会话(和连接)上下文中存在的客户端对象。

但主题使用者则包含 2 个部分 -

  • 会话(或 JMSContext)上下文中存在的客户端对象,
  • Azure 服务总线中的订阅实体。

订阅记录在此处,它可以是以下内容之一:

  • 共享持久订阅
  • 共享非持久订阅
  • 非共享持久订阅
  • 非共享非持久订阅

JMS 队列浏览器

JMS API 提供一个 QueueBrowser 对象,该对象允许应用程序浏览队列中的消息,并显示每条消息的标头值。

可以使用 JMSContext 创建队列浏览器,如以下示例所示:

QueueBrowser browser = context.createBrowser(queue);

注意

JMS API 不提供用于浏览主题的 API。

这是因为主题本身并不存储消息。 一旦将消息发送到主题,就会将其转发到相应的订阅。

JMS 消息选择器

接收应用程序可使用消息选择器来筛选接收到的消息。 借助消息选择器,接收应用程序会将消息筛选卸载到 JMS 提供程序(此时为 Azure 服务总线),而自身不会负责这项工作。

创建以下任一使用者时,可以使用选择器 -

  • 共享持久订阅
  • 非共享持久订阅
  • 共享非持久订阅
  • 非共享非持久订阅
  • 队列浏览器

AMQP 处置和服务总线操作映射

以下是将 AMQP 处置转换为服务总线操作的方法:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

总结

本开发人员指南展示了 Java 客户端应用程序如何使用 Java 消息服务 (JMS) 与 Azure 服务总线连接。

后续步骤

要详细了解 Azure 服务总线以及 Java 消息服务 (JMS) 实体,请查看以下文章: