将 Java 消息服务 (JMS) 用于 Azure 服务总线和 AMQP 1.0Use the Java Message Service (JMS) with Azure Service Bus and AMQP 1.0

本文说明了如何通过采用常用 Java 消息服务 (JMS) API 标准的 Java 应用程序使用 Azure 服务总线消息传送功能(队列和发布/订阅主题)。This article explains how to use Azure Service Bus messaging features (queues and publish/subscribe topics) from Java applications using the popular Java Message Service (JMS) API standard. 此处的随附文章解释如何使用 Azure 服务总线 .NET API 来执行相同操作。There is a companion article that explains how to do the same using the Azure Service Bus .NET API. 使用 AMQP 1.0,可以同时使用以下两个指南来了解跨平台消息。You can use these two guides together to learn about cross-platform messaging using AMQP 1.0.

高级消息队列协议 (AMQP) 1.0 是一个高效、可靠的线级消息传送协议,可用于构建可靠的跨平台消息传送应用程序。The Advanced Message Queuing Protocol (AMQP) 1.0 is an efficient, reliable, wire-level messaging protocol that you can use to build robust, cross-platform messaging applications.

Azure 服务总线支持 AMQP 1.0,这意味着,可以通过一系列使用有效二进制协议的平台来使用队列和发布/订阅中转消息传送功能。Support for AMQP 1.0 in Azure Service Bus means that you can use the queuing and publish/subscribe brokered messaging features from a range of platforms using an efficient binary protocol. 此外,还可以生成由结合使用多个语言、框架和操作系统构建的组件组成的应用程序。Furthermore, you can build applications comprised of components built using a mix of languages, frameworks, and operating systems.

服务总线入门Get started with Service Bus

此指南假定已有包含名为 basicqueue 的队列的服务总线命名空间。This guide assumes that you already have a Service Bus namespace containing a queue named basicqueue. 如果没有,则可以使用 Azure 经典门户创建命名空间和队列If you don't, then you can create the namespace and queue using the Azure portal. 有关如何创建服务总线命名空间和队列的详细信息,请参阅服务总线队列入门For more information about how to create Service Bus namespaces and queues, see Get started with Service Bus queues.

Note

分区队列和主题也支持 AMQP。Partitioned queues and topics also support AMQP. 有关详细信息,请参阅分区消息传送实体For more information, see Partitioned messaging entities.

下载 AMQP 1.0 JMS 客户端库Downloading the AMQP 1.0 JMS client library

有关 Apache Qpid JMS AMQP 1.0 客户端库最新版本的下载地址的信息,请访问 https://qpid.apache.org/download.htmlFor information about where to download the latest version of the Apache Qpid JMS AMQP 1.0 client library, visit https://qpid.apache.org/download.html.

使用 Service Bus 构建和运行 JMS 应用程序时必须将以下 4 个 JAR 文件从 Apache Qpid JMS AMQP 1.0 分发存档添加到 Java CLASSPATH:You must add the following four JAR files from the Apache Qpid JMS AMQP 1.0 distribution archive to the Java CLASSPATH when building and running JMS applications with Service Bus:

  • geronimo-jms_1.1_spec-1.0.jargeronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jarqpid-jms-client-[version].jar

Note

JMS JAR 名称和版本可能已更改。JMS JAR names and versions may have changed. 有关详细信息,请参阅 Qpid JMS - AMQP 1.0For details, see Qpid JMS - AMQP 1.0.

为 Java 应用程序编码Coding Java applications

Java 命名和目录接口 (JNDI)Java Naming and Directory Interface (JNDI)

JMS 使用 Java 命名和目录接口 (JNDI) 创建逻辑名称和物理名称之间的分隔。JMS uses the Java Naming and Directory Interface (JNDI) to create a separation between logical names and physical names. 使用 JNDI 解析以下两种类型的 JMS 对象:ConnectionFactory 和 Destination。Two types of JMS objects are resolved using JNDI: ConnectionFactory and Destination. JNDI 使用一个提供程序模型,可以在其中插入不同目录服务来处理名称解析任务。JNDI uses a provider model into which you can plug different directory services to handle name resolution duties. Apache Qpid JMS AMQP 1.0 库附带一个使用以下格式的属性文件配置的、基于属性文件的简单 JNDI 提供程序。The Apache Qpid JMS AMQP 1.0 library comes with a simple property file-based JNDI Provider that is configured using a properties file of the following format:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.chinacloudapi.cn

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

设置 JNDI 上下文和配置 ConnectionFactorySetup JNDI context and Configure the ConnectionFactory

Azure 门户“主连接字符串” 下的 “共享访问策略”中提供了可引用的 ConnectionString The ConnectionString referenced in the one available in the 'Shared Access Policies' in the Azure Portal under Primary Connection String

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

配置制造者和使用者目标队列Configure Producer and Consumer Destination Queues

用于在 Qpid 属性文件 JNDI 提供程序中定义目标的条目的格式如下:The entry used to define a destination in the Qpid properties file JNDI provider is of the following format:

创建制造者目标队列 -To create the destination queue for the Producer -

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create Producer
MessageProducer producer = session.createProducer(queue);

创建使用者目标队列 -To create a destination queue for the Consumer -

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create Consumer
MessageConsumer consumer = session.createConsumer(queue);

编写 JMS 应用程序Write the JMS application

将 JMS 用于服务总线时不需要特殊的 API 或选项。There are no special APIs or options required when using JMS with Service Bus. 但是,有一些限制,我们会在后面说明。However, there are a few restrictions that will be covered later. 与使用任何 JMS 应用程序一样,若要解析 ConnectionFactory 和目标,首先要做的事情是配置 JNDI 环境。As with any JMS application, the first thing required is configuration of the JNDI environment, to be able to resolve a ConnectionFactory and destinations.

配置 JNDI InitialContextConfigure the JNDI InitialContext

JNDI 环境是通过将配置信息的哈希表传入到 javax.naming.InitialContext 类的构造函数中来配置的。The JNDI environment is configured by passing a hashtable of configuration information into the constructor of the javax.naming.InitialContext class. 哈希表中的两个必需元素是初始上下文工厂的类名称和提供程序 URL。The two required elements in the hashtable are the class name of the Initial Context Factory and the Provider URL. 以下代码演示了如何配置 JNDI 环境以将基于 Qpid 属性文件的 JNDI 提供程序用于名为 servicebus.properties的属性文件。The following code shows how to configure the JNDI environment to use the Qpid properties file based JNDI Provider with a properties file named servicebus.properties.

// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

使用服务总线队列的简单 JMS 应用程序A simple JMS application using a Service Bus queue

以下示例程序将 JMS TextMessages 发送到 JNDI 逻辑名称为 QUEUE 的 Service Bus 队列,并接收返回的消息。The following example program sends JMS TextMessages to a Service Bus queue with the JNDI logical name of QUEUE, and receives the messages back.

可以从 Azure 服务总线示例 JMS 队列快速启动中访问所有源代码和配置信息You can all access all the source code and configuration information from the Azure Service Bus Samples JMS Queue Quick Start

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS Queue producer into
 * an Azure Service Bus Queue, and receive them with a JMS message consumer.
 * JMS Queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string builder is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

        // set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // we create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter
        {
            // Create Connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            // Create Session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create Connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            connection.start();
            // Create Session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // receives message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}   

运行应用程序Run the application

传递共享访问策略中的“连接字符串”,以运行应用程序。Pass the Connection String from the Shared Access Policies to run the application. 以下是通过运行应用程序的表单输出:Below is the output of the form by running the Application:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP 处置和服务总线操作映射AMQP disposition and Service Bus operation mapping

以下是将 AMQP 处置转换为服务总线操作的方法:Here is how an AMQP disposition translates to a Service Bus operation:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS 主题与服务总线主题JMS Topics vs. Service Bus Topics

通过 Java 消息服务 (JMS) API 使用 Azure 服务总线主题和订阅可以提供基本的发送和接收功能。Using Azure Service Bus topics and subscriptions through the Java Message Service (JMS) API provides basic send and receive capabilities. 从其他使用 JMS 兼容 API 的消息代理处移植应用程序时,这是一种很方便的选择,即使服务总线主题不同于 JMS 主题且需要一些调整。It's a convenient choice when porting applications from other message brokers with JMS-compliant APIs, even though Service Bus topics differ from JMS Topics and require a few adjustments.

Azure 服务总线主题将消息路由到已命名的、共享的、持久的订阅中,这些订阅通过 Azure 资源管理接口、Azure 命令行工具或 Azure 门户进行管理。Azure Service Bus topics route messages into named, shared, durable subscriptions that are managed through the Azure Resource Management interface, the Azure command line tools, or through the Azure portal. 每个订阅允许使用最多 2000 条选择规则,每条规则可能有一个筛选器条件以及一项适用于 SQL 筛选器的元数据转换操作。Each subscription allows for up to 2000 selection rules, each of which may have a filter condition and, for SQL filters, also a metadata transformation action. 每次出现筛选器条件匹配的情况时,系统就会选择将要复制到订阅中的输入消息。Each filter condition match selects the input message to be copied into tehj subscription.

从订阅接收消息与从队列接收消息是相同的。Receiving messages from subscriptions is identical receiving messages from queues. 每个订阅都有一个关联的死信队列,并且可以将消息自动转发给其他队列或主题。Each subscription has an associated dead-letter queue as well as the ability to automatically forward messages to another queue or topics.

JMS 主题允许客户端动态创建非持久的和持久的订阅者,这样就可以选择性地允许通过消息选择器来筛选消息。JMS Topics allow clients to dynamically create nondurable and durable subscribers that optionally allow filtering messages with message selectors. 服务总线不支持这些非共享的实体。These unshared entities are not supported by Service Bus. 但是,服务总线的 SQL 筛选器规则语法非常类似于 JMS 支持的消息选择器语法。The SQL filter rule syntax for Service Bus is, however, very similar to the message selector syntax supported by JMS.

如此示例所示,JMS 主题发布者端兼容服务总线,但动态订阅者则不兼容。The JMS Topic publisher side is compatible with Service Bus, as shown in this sample, but dynamic subscribers are not. 不支持将下述与拓扑相关的 JMS API 与服务总线配合使用。The following topology-related JMS APIs are not supported with Service Bus.

不受支持的功能和限制Unsupported features and restrictions

在将 JMS over AMQP 1.0 用于 Service Bus 时存在以下限制,即:The following restrictions exist when using JMS over AMQP 1.0 with Service Bus, namely:

  • 每个会话只允许一个 MessageProducer 或 MessageConsumer。 Only one MessageProducer or MessageConsumer is allowed per Session. 如果需要在应用程序中创建多个 MessageProducers 或 MessageConsumers,请分别对其创建专用会话。 If you need to create multiple MessageProducers or MessageConsumers in an application, create a dedicated Session for each of them.
  • 当前不支持易失性主题订阅。Volatile topic subscriptions aren't currently supported.
  • 当前不支持 MessageSelectorsMessageSelectors aren't currently supported.
  • 不支持分布式事务(但支持事务处理会话)。Distributed transactions aren't supported (but transacted sessions are supported).

此外,Azure 服务总线将控制平面从数据平面拆分了出来,因此,不支持多个 JMS 的动态拓扑函数:Additionally, Azure Service Bus splits the control plane from the data plane and therefore does not support several of JMS's dynamic topology functions:

不支持的方法Unsupported method 替换为Replace with
createDurableSubscribercreateDurableSubscriber 创建移植消息选择器的主题订阅create a Topic subscription porting the message selector
createDurableConsumercreateDurableConsumer 创建移植消息选择器的主题订阅create a Topic subscription porting the message selector
createSharedConsumercreateSharedConsumer 服务总线主题始终可共享,请参阅上述内容Service Bus topics are always shareable, see above
createSharedDurableConsumercreateSharedDurableConsumer 服务总线主题始终可共享,请参阅上述内容Service Bus topics are always shareable, see above
createTemporaryTopiccreateTemporaryTopic 通过管理 API/工具/门户创建主题(AutoDeleteOnIdle 被设置为过期期间)create a topic via management API/tools/portal with AutoDeleteOnIdle set to an expiration period
createTopiccreateTopic 通过管理 API/工具/门户创建主题create a topic via management API/tools/portal
unsubscribeunsubscribe 删除主题管理 API/工具/门户delete the topic management API/tools/portal
createBrowsercreateBrowser 不受支持。unsupported. 使用服务总线 API 的 Peek() 功能Use the Peek() functionality of the Service Bus API
createQueuecreateQueue 通过管理 API/工具/门户创建队列create a queue via management API/tools/portal
createTemporaryQueuecreateTemporaryQueue 通过管理 API/工具/门户创建队列(AutoDeleteOnIdle 被设置为过期期间)create a queue via management API/tools/portal with AutoDeleteOnIdle set to an expiration period
receiveNoWaitreceiveNoWait 利用服务总线 SDK 提供的 receive() 方法并指定非常低或为零的超时utilize the receive() method provided by the Service Bus SDK and specify a very low or zero timeout

摘要Summary

本操作方法指南演示了如何通过使用常用 JMS API 和 AMQP 1.0 通过 Java 使用 Service Bus 中转消息传送功能(队列和发布/订阅主题)。This how-to guide showed how to use Service Bus brokered messaging features (queues and publish/subscribe topics) from Java using the popular JMS API and AMQP 1.0.

也可以通过其他语言(包括 .NET、C、Python 和 PHP)使用 Service Bus AMQP 1.0。You can also use Service Bus AMQP 1.0 from other languages, including .NET, C, Python, and PHP. 使用这些不同语言构建的组件可以使用服务总线中的 AMQP 1.0 支持可靠且完全无损地交换消息。Components built using these different languages can exchange messages reliably and at full fidelity using the AMQP 1.0 support in Service Bus.

后续步骤Next steps