将 Java 消息服务用于 Azure 服务总线和 AMQP 1.0Use the Java Message Service with Azure Service Bus and AMQP 1.0

警告

本文适用于对 Java 消息服务 (JMS) 1.1 API 的有限支持,仅针对 Azure 服务总线标准层。This article caters to limited support for the Java Message Service (JMS) 1.1 API and exists for the Azure Service Bus standard tier only.

对 Java 消息服务 2.0 API 的支持只在 Azure 服务总线高级层(预览版)提供。Full support for the Java Message Service 2.0 API is available only on the Azure Service Bus premium tier in preview. 建议使用该层级。We recommend that you use this tier.

本文说明了如何使用采用常用 JMS API 标准的 Java 应用程序中的服务总线消息传送功能。This article explains how to use Service Bus messaging features from Java applications by using the popular JMS API standard. 这些消息传送功能包括队列和发布/订阅主题。These messaging features include queues and publishing or subscribing to topics. 随附文章解释如何使用 Azure 服务总线 .NET API 来执行相同操作。A companion article explains how to do the same by using the Azure Service Bus .NET API. 可以结合这两篇文章来了解如何使用高级消息队列协议 (AMQP) 1.0 进行跨平台的消息传送。You can use these two articles together to learn about cross-platform messaging using the Advanced Message Queuing Protocol (AMQP) 1.0.

AMQP 1.0 是一个高效、可靠的线级消息传送协议,可用于构建强大、跨平台的消息传送应用程序。AMQP 1.0 is an efficient, reliable, wire-level messaging protocol that you can use to build robust, cross-platform messaging applications.

服务总线支持 AMQP 1.0,这意味着,可以通过一系列使用有效二进制协议的平台来使用队列和发布/订阅中转消息传送功能。Support for AMQP 1.0 in Service Bus means that you can use the queuing and publish or subscribe brokered messaging features from a range of platforms by using an efficient binary protocol. 此外,还可以生成由结合使用多个语言、框架和操作系统构建的组件组成的应用程序。You also can build applications composed of components built by using a mix of languages, frameworks, and operating systems.

服务总线入门Get started with Service Bus

本文假定已拥有包含名为 basicqueue 的队列的服务总线命名空间。This article assumes that you already have a Service Bus namespace that contains a queue named basicqueue. 如果没有,则可以使用 Azure 经典门户创建命名空间和队列If you don't, you can create the namespace and queue by using the Azure portal. 有关如何创建服务总线命名空间和队列的详细信息,请参阅服务总线队列入门For more information about how to create Service Bus namespaces and queues, see Get started with Service Bus queues.

备注

分区队列和主题也支持 AMQP。Partitioned queues and topics also support AMQP. 有关详细信息,请参阅分区消息实体针对服务总线分区队列和主题的 AMQP 1.0 支持For more information, see Partitioned messaging entities and AMQP 1.0 support for Service Bus partitioned queues and topics.

下载 AMQP 1.0 JMS 客户端库Download the AMQP 1.0 JMS client library

有关从哪里下载 Apache Qpid JMS AMQP 1.0 客户端库的最新版本的信息,请访问 Apache Qpid 下载网站For information about where to download the latest version of the Apache Qpid JMS AMQP 1.0 client library, see the Apache Qpid download site.

使用服务总线构建和运行 JMS 应用程序时,必须将以下 JAR 文件从 Apache Qpid JMS AMQP 1.0 分发存档添加到 Java CLASSPATH 环境变量:You must add the following JAR files from the Apache Qpid JMS AMQP 1.0 distribution archive to the Java CLASSPATH environment variable when you build and run JMS applications with Service Bus:

  • geronimo-jms_1.1_spec-1.0.jargeronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jarqpid-jms-client-[version].jar

备注

JMS JAR 名称和版本可能已更改。JMS JAR names and versions might have changed. 有关详细信息,请参阅 Qpid JMS AMQP 1.0For more information, see Qpid JMS AMQP 1.0.

代码 Java 应用程序Code Java applications

Java 命名和目录接口Java Naming and Directory Interface

JMS 使用 Java 命名和目录接口 (JNDI) 创建逻辑名称和物理名称之间的分隔。JMS uses the Java Naming and Directory Interface (JNDI) to create a separation between logical names and physical names. 使用 JNDI 解析以下两种类型的 JMS 对象:ConnectionFactory 和 Destination 。Two types of JMS objects are resolved by using JNDI: ConnectionFactory and Destination . JNDI 使用一个提供程序模型,可以在其中插入不同目录服务来处理名称解析任务。JNDI uses a provider model into which you can plug different directory services to handle name resolution duties. Apache Qpid JMS AMQP 1.0 库附带一个使用以下格式的属性文件配置的、基于属性文件的简单 JNDI 提供程序:The Apache Qpid JMS AMQP 1.0 library comes with a simple property file-based JNDI provider that's configured by using a properties file of the following format:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.chinacloudapi.cn

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

设置 JNDI 上下文和配置 ConnectionFactory 对象Set up JNDI context and configure the ConnectionFactory object

Azure 门户“主连接字符串”下的“共享访问策略”中提供了可引用的连接字符串。The connection string referenced is the one available in the Shared Access Policies in the Azure portal under Primary Connection String .

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

配置制造者和使用者目标队列Configure producer and consumer destination queues

用于在 Qpid 属性文件 JNDI 提供程序中定义目标的项的格式如下。The entry used to define a destination in the Qpid properties file JNDI provider is of the following format.

创建制造者目标队列:To create a destination queue for the producer:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

创建使用者目标队列:To create a destination queue for the consumer:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

编写 JMS 应用程序Write the JMS application

将 JMS 用于服务总线时不需要特殊的 API 或选项。No special APIs or options are required when you use JMS with Service Bus. 但是,有一些限制,我们会在后面说明。There are a few restrictions that will be covered later. 与使用任何 JMS 应用程序一样,若要解析 ConnectionFactory 对象和目标,首先要做的事情是配置 JNDI 环境。As with any JMS application, the first thing required is configuration of the JNDI environment to be able to resolve a ConnectionFactory object and destinations.

配置 JNDI InitialContext 对象Configure the JNDI InitialContext object

JNDI 环境是通过将配置信息的哈希表传入到 javax.naming.InitialContext 类的构造函数中来配置的。The JNDI environment is configured by passing a hash table of configuration information into the constructor of the javax.naming.InitialContext class. 哈希表中的两个必需元素是初始上下文工厂的类名称和提供程序 URL。The two required elements in the hash table are the class name of the Initial Context Factory and the provider URL. 以下代码演示了如何配置 JNDI 环境以将基于 Qpid 属性文件的 JNDI 提供程序用于名为 servicebus.properties 的属性文件。The following code shows how to configure the JNDI environment to use the Qpid properties file-based JNDI provider with a properties file named servicebus.properties .

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

使用服务总线队列的简单 JMS 应用程序A simple JMS application that uses a Service Bus queue

以下示例程序将 JMS 短信发送到 JNDI 逻辑名称为 QUEUE 的 Service Bus 队列,并接收返回的消息。The following example program sends JMS text messages to a Service Bus queue with the JNDI logical name of QUEUE and receives the messages back.

可以从 Azure 服务总线示例 JMS 队列快速入门中访问所有源代码和配置信息。You can access all the source code and configuration information from the Azure Service Bus samples JMS queue quickstart.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string builder is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

运行应用程序Run the application

传递共享访问策略中的“连接字符串”,以运行应用程序。Pass the Connection String from the Shared Access Policies to run the application. 运行应用程序会生成以下输出:The following output is of the form running the application:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP 处置和服务总线操作映射AMQP disposition and Service Bus operation mapping

以下是将 AMQP 处置转换为服务总线操作的方法:Here's how an AMQP disposition translates to a Service Bus operation:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS 主题与服务总线主题JMS topics vs. Service Bus topics

通过 JMS API 使用服务总线主题和订阅可以提供基本的发送和接收功能。Using Service Bus topics and subscriptions through the JMS API provides basic send and receive capabilities. 从其他使用 JMS 兼容 API 的消息代理处移植应用程序时,这是一种很方便的选择,即使服务总线主题不同于 JMS 主题且需要一些调整。It's a convenient choice when you port applications from other message brokers with JMS-compliant APIs, even though Service Bus topics differ from JMS topics and require a few adjustments.

服务总线主题将消息路由到已命名的、共享的、持久的订阅中,这些订阅通过 Azure 资源管理接口、Azure 命令行工具或 Azure 门户进行管理。Service Bus topics route messages into named, shared, and durable subscriptions that are managed through the Azure Resource Management interface, the Azure command-line tools, or the Azure portal. 每个订阅允许使用最多 2000 条选择规则,每条规则可能有一个筛选器条件以及一项适用于 SQL 筛选器的元数据转换操作。Each subscription allows for up to 2,000 selection rules, each of which might have a filter condition and, for SQL filters, also a metadata transformation action. 每次出现筛选器条件匹配的情况时,系统就会选择将要复制到订阅中的输入消息。Each filter condition match selects the input message to be copied into the subscription.

从订阅接收消息与从队列接收消息是相同的。Receiving messages from subscriptions is identical to receiving messages from queues. 每个订阅都有一个关联的死信队列,并且可以将消息自动转发给其他队列或主题。Each subscription has an associated dead-letter queue and the ability to automatically forward messages to another queue or topics.

JMS 主题允许客户端动态创建非持久的和持久的订阅者,这样就可以选择性地允许通过消息选择器来筛选消息。JMS topics allow clients to dynamically create nondurable and durable subscribers that optionally allow filtering messages with message selectors. 服务总线不支持这些非共享的实体。These unshared entities aren't supported by Service Bus. 服务总线的 SQL 筛选器规则语法类似于 JMS 支持的消息选择器语法。The SQL filter rule syntax for Service Bus is similar to the message selector syntax supported by JMS.

如此示例所示,JMS 主题发布者端兼容服务总线,但动态订阅者则不兼容。The JMS topic publisher side is compatible with Service Bus, as shown in this sample, but dynamic subscribers aren't. 不支持将下述与拓扑相关的 JMS API 与服务总线配合使用。The following topology-related JMS APIs aren't supported with Service Bus.

不受支持的功能和限制Unsupported features and restrictions

通过 AMQP 1.0 将 JMS 用于服务总线时存在以下限制:The following restrictions exist when you use JMS over AMQP 1.0 with Service Bus, namely:

  • 每个会话只允许一个 MessageProducer 或 MessageConsumer 对象 。Only one MessageProducer or MessageConsumer object is allowed per session. 如果需要在应用程序中创建多个 MessageProducer 或 MessageConsumer 对象,请分别对其创建专用会话 。If you need to create multiple MessageProducer or MessageConsumer objects in an application, create a dedicated session for each of them.
  • 当前不支持易失性主题订阅。Volatile topic subscriptions aren't currently supported.
  • 当前不支持 MessageSelector 对象。MessageSelector objects aren't currently supported.
  • 不支持分布式事务,但支持事务处理会话。Distributed transactions aren't supported, but transacted sessions are supported.

服务总线将控制平面从数据平面拆分了出来,因此,不支持多个 JMS 的动态拓扑函数。Service Bus splits the control plane from the data plane, so it doesn't support several of JMS's dynamic topology functions.

不支持的方法Unsupported method 替换为Replace with
createDurableSubscribercreateDurableSubscriber 创建移植消息选择器的主题订阅。Create a topic subscription that ports the message selector.
createDurableConsumercreateDurableConsumer 创建移植消息选择器的主题订阅。Create a topic subscription that ports the message selector.
createSharedConsumercreateSharedConsumer 服务总线主题始终可共享。Service Bus topics are always shareable. 请参阅“JMS 主题和服务总线主题”部分。See the section "JMS topics vs. Service Bus topics."
createSharedDurableConsumercreateSharedDurableConsumer 服务总线主题始终可共享。Service Bus topics are always shareable. 请参阅“JMS 主题和服务总线主题”部分。See the section "JMS topics vs. Service Bus topics."
createTemporaryTopiccreateTemporaryTopic 通过管理 API/工具/门户创建主题(AutoDeleteOnIdle 被设置为过期期间)。Create a topic via the management API, tools, or the portal with AutoDeleteOnIdle set to an expiration period.
createTopiccreateTopic 通过管理 API/工具/门户创建主题。Create a topic via the management API, tools, or the portal.
unsubscribeunsubscribe 删除主题管理 API/工具/门户。Delete the topic management API, tools, or portal.
createBrowsercreateBrowser 不支持。Unsupported. 使用服务总线 API 的 Peek() 功能。Use the Peek() functionality of the Service Bus API.
createQueuecreateQueue 通过管理 API/工具/门户创建队列。Create a queue via the management API, tools, or the portal.
createTemporaryQueuecreateTemporaryQueue 通过管理 API/工具/门户创建队列(AutoDeleteOnIdle 被设置为过期期间)。Create a queue via the management API, tools, or the portal with AutoDeleteOnIdle set to an expiration period.
receiveNoWaitreceiveNoWait 使用服务总线 SDK 提供的 receive() 方法并指定非常低或为零的超时。Use the receive() method provided by the Service Bus SDK and specify a very low or zero timeout.

总结Summary

本文演示了如何通过使用常用 JMS API 和 AMQP 1.0 通过 Java 使用服务总线中转消息传送功能(例如队列和发布/订阅主题)。This article showed you how to use Service Bus brokered messaging features, such as queues and publish or subscribe topics, from Java by using the popular JMS API and AMQP 1.0.

也可以通过其他语言(例如 .NET、C、Python 和 PHP)使用服务总线 AMQP 1.0。You can also use Service Bus AMQP 1.0 from other languages, such as .NET, C, Python, and PHP. 使用这些不同语言构建的组件可以使用服务总线中的 AMQP 1.0 支持可靠且完全无损地交换消息。Components built by using these different languages can exchange messages reliably and at full fidelity by using the AMQP 1.0 support in Service Bus.

后续步骤Next steps