将 Java 消息服务用于 Azure 服务总线和 AMQP 1.0

警告

本文适用于对 Java 消息服务 (JMS) 1.1 API 的有限支持,仅针对 Azure 服务总线标准层。

对 Java 消息服务 2.0 API 的支持只在 Azure 服务总线高级层(预览版)提供。 建议使用该层级。

本文说明了如何使用采用常用 JMS API 标准的 Java 应用程序中的服务总线消息传送功能。 这些消息传送功能包括队列和发布/订阅主题。 随附文章解释如何使用 Azure 服务总线 .NET API 来执行相同操作。 可以结合这两篇文章来了解如何使用高级消息队列协议 (AMQP) 1.0 进行跨平台的消息传送。

AMQP 1.0 是一个高效、可靠的线级消息传送协议,可用于构建强大、跨平台的消息传送应用程序。

服务总线支持 AMQP 1.0,这意味着,可以通过一系列使用有效二进制协议的平台来使用队列和发布/订阅中转消息传送功能。 此外,还可以生成由结合使用多个语言、框架和操作系统构建的组件组成的应用程序。

服务总线入门

本文假定已拥有包含名为 basicqueue 的队列的服务总线命名空间。 如果没有,则可以使用 Azure 经典门户创建命名空间和队列。 有关如何创建服务总线命名空间和队列的详细信息,请参阅服务总线队列入门

备注

分区队列和主题也支持 AMQP。 有关详细信息,请参阅分区消息实体针对服务总线分区队列和主题的 AMQP 1.0 支持

下载 AMQP 1.0 JMS 客户端库

有关从哪里下载 Apache Qpid JMS AMQP 1.0 客户端库的最新版本的信息,请访问 Apache Qpid 下载网站

使用服务总线构建和运行 JMS 应用程序时,必须将以下 JAR 文件从 Apache Qpid JMS AMQP 1.0 分发存档添加到 Java CLASSPATH 环境变量:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

备注

JMS JAR 名称和版本可能已更改。 有关详细信息,请参阅 Qpid JMS AMQP 1.0

代码 Java 应用程序

Java 命名和目录接口

JMS 使用 Java 命名和目录接口 (JNDI) 创建逻辑名称和物理名称之间的分隔。 使用 JNDI 解析以下两种类型的 JMS 对象:ConnectionFactory 和 Destination 。 JNDI 使用一个提供程序模型,可以在其中插入不同目录服务来处理名称解析任务。 Apache Qpid JMS AMQP 1.0 库附带一个使用以下格式的属性文件配置的、基于属性文件的简单 JNDI 提供程序:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.chinacloudapi.cn

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

设置 JNDI 上下文和配置 ConnectionFactory 对象

Azure 门户“主连接字符串”下的“共享访问策略”中提供了可引用的连接字符串。

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

配置制造者和使用者目标队列

用于在 Qpid 属性文件 JNDI 提供程序中定义目标的项的格式如下。

创建制造者目标队列:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

创建使用者目标队列:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

编写 JMS 应用程序

将 JMS 用于服务总线时不需要特殊的 API 或选项。 但是,有一些限制,我们会在后面说明。 与使用任何 JMS 应用程序一样,若要解析 ConnectionFactory 对象和目标,首先要做的事情是配置 JNDI 环境。

配置 JNDI InitialContext 对象

JNDI 环境是通过将配置信息的哈希表传入到 javax.naming.InitialContext 类的构造函数中来配置的。 哈希表中的两个必需元素是初始上下文工厂的类名称和提供程序 URL。 以下代码演示了如何配置 JNDI 环境以将基于 Qpid 属性文件的 JNDI 提供程序用于名为 servicebus.properties 的属性文件。

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

使用服务总线队列的简单 JMS 应用程序

以下示例程序将 JMS 短信发送到 JNDI 逻辑名称为 QUEUE 的 Service Bus 队列,并接收返回的消息。

可以从 Azure 服务总线示例 JMS 队列快速入门中访问所有源代码和配置信息。

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string builder is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

运行应用程序

传递共享访问策略中的“连接字符串”,以运行应用程序。 运行应用程序会生成以下输出:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP 处置和服务总线操作映射

以下是将 AMQP 处置转换为服务总线操作的方法:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS 主题与服务总线主题

通过 JMS API 使用服务总线主题和订阅可以提供基本的发送和接收功能。 从其他使用 JMS 兼容 API 的消息代理处移植应用程序时,这是一种很方便的选择,即使服务总线主题不同于 JMS 主题且需要一些调整。

服务总线主题将消息路由到已命名的、共享的、持久的订阅中,这些订阅通过 Azure 资源管理接口、Azure 命令行工具或 Azure 门户进行管理。 每个订阅允许使用最多 2000 条选择规则,每条规则可能有一个筛选器条件以及一项适用于 SQL 筛选器的元数据转换操作。 每次出现筛选器条件匹配的情况时,系统就会选择将要复制到订阅中的输入消息。

从订阅接收消息与从队列接收消息是相同的。 每个订阅都有一个关联的死信队列,并且可以将消息自动转发给其他队列或主题。

JMS 主题允许客户端动态创建非持久的和持久的订阅者,这样就可以选择性地允许通过消息选择器来筛选消息。 服务总线不支持这些非共享的实体。 服务总线的 SQL 筛选器规则语法类似于 JMS 支持的消息选择器语法。

如此示例所示,JMS 主题发布者端兼容服务总线,但动态订阅者则不兼容。 不支持将下述与拓扑相关的 JMS API 与服务总线配合使用。

不受支持的功能和限制

通过 AMQP 1.0 将 JMS 用于服务总线时存在以下限制:

  • 每个会话只允许一个 MessageProducer 或 MessageConsumer 对象 。 如果需要在应用程序中创建多个 MessageProducer 或 MessageConsumer 对象,请分别对其创建专用会话 。
  • 当前不支持易失性主题订阅。
  • 当前不支持 MessageSelector 对象。
  • 不支持分布式事务,但支持事务处理会话。

服务总线将控制平面从数据平面拆分了出来,因此,不支持多个 JMS 的动态拓扑函数。

不支持的方法 替换为
createDurableSubscriber 创建移植消息选择器的主题订阅。
createDurableConsumer 创建移植消息选择器的主题订阅。
createSharedConsumer 服务总线主题始终可共享。 请参阅“JMS 主题和服务总线主题”部分。
createSharedDurableConsumer 服务总线主题始终可共享。 请参阅“JMS 主题和服务总线主题”部分。
createTemporaryTopic 通过管理 API/工具/门户创建主题(AutoDeleteOnIdle 被设置为过期期间)。
createTopic 通过管理 API/工具/门户创建主题。
unsubscribe 删除主题管理 API/工具/门户。
createBrowser 不支持。 使用服务总线 API 的 Peek() 功能。
createQueue 通过管理 API/工具/门户创建队列。
createTemporaryQueue 通过管理 API/工具/门户创建队列(AutoDeleteOnIdle 被设置为过期期间)。
receiveNoWait 使用服务总线 SDK 提供的 receive() 方法并指定非常低或为零的超时。

总结

本文演示了如何通过使用常用 JMS API 和 AMQP 1.0 通过 Java 使用服务总线中转消息传送功能(例如队列和发布/订阅主题)。

也可以通过其他语言(例如 .NET、C、Python 和 PHP)使用服务总线 AMQP 1.0。 使用这些不同语言构建的组件可以使用服务总线中的 AMQP 1.0 支持可靠且完全无损地交换消息。

后续步骤