快速入门:使用 Azure CLI 创建服务总线队列Quickstart: Use the Azure CLI to create a Service Bus queue

本快速入门介绍了如何使用 Azure CLI 和服务总线 Java 库通过服务总线来发送和接收消息。This quickstart describes how to send and receive messages with Service Bus by using the Azure CLI and the Service Bus Java library. 最后,如果对更多的技术细节感兴趣,可以阅读说明,了解示例代码的重要元素。Finally, if you're interested in more technical details, you can read an explanation of the key elements of the sample code.

什么是 Service Bus 队列?What are Service Bus queues?

服务总线队列支持中转消息传送通信模型。Service Bus queues support a brokered messaging communication model. 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。When using queues, components of a distributed application do not communicate directly with each other; instead they exchange messages via a queue, which acts as an intermediary (broker). 消息创建方(发送方)将消息传送到队列,并继续对其进行处理。A message producer (sender) hands off a message to the queue and then continues its processing. 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。Asynchronously, a message consumer (receiver) pulls the message from the queue and processes it. 创建方不必等待使用方的答复即可继续处理并发送更多消息。The producer does not have to wait for a reply from the consumer in order to continue to process and send further messages. 队列为一个或多个竞争使用方提供先入先出 (FIFO) 消息传递方式。Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers. 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。That is, messages are typically received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:Service Bus queues are a general-purpose technology that can be used for a wide variety of scenarios:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。Communication between web and worker roles in a multi-tier Azure application.
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。Communication between on-premises apps and Azure-hosted apps in a hybrid solution.
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。Communication between components of a distributed application running on-premises in different organizations or departments of an organization.

利用队列,可以更轻松地缩放应用程序,并增强体系结构的弹性。Using queues enables you to scale your applications more easily, and enable more resiliency to your architecture.

先决条件Prerequisites

如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, you can create a trial account before you begin.

# Create a resource group
resourceGroupName="myResourceGroup"

az group create --name $resourceGroupName --location chinaeast

# Create a Service Bus messaging namespace with a unique name
namespaceName=myNameSpace$RANDOM
az servicebus namespace create --resource-group $resourceGroupName --name $namespaceName --location 'China East'

# Create a Service Bus queue
az servicebus queue create --resource-group $resourceGroupName --namespace-name $namespaceName --name BasicQueue

# Get the connection string for the namespace
connectionString=$(az servicebus namespace authorization-rule keys list --resource-group $resourceGroupName --namespace-name $namespaceName --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)

运行最后一个命令后,将所选的连接字符串和队列名称复制并粘贴到一个临时位置,例如记事本。After the last command runs, copy and paste the connection string, and the queue name you selected, to a temporary location such as Notepad. 在下一步中将要使用它。You will need it in the next step.

发送和接收消息Send and receive messages

创建命名空间和队列并且拥有所需的凭据后,便可以发送和接收消息。After you've created the namespace and queue, and you have the necessary credentials, you are ready to send and receive messages. 可以在此 GitHub 示例文件夹中检查代码。You can examine the code in this GitHub sample folder.

  1. 通过发出以下命令,在计算机上克隆服务总线 GitHub 存储库Clone the Service Bus GitHub repository on your computer by issuing the following command:

    git clone https://github.com/Azure/azure-service-bus.git
    
  2. 将当前目录更改为示例文件夹,使用正斜杠作为分隔符:Change your current directory to the sample folder, using forward slashes as path separators:

    cd azure-service-bus/samples/Java/azure-servicebus/QueuesGettingStarted
    
  3. 发出以下命令来生成应用程序:Issue the following command to build the application:

    mvn clean package -DskipTests
    
  4. 要运行该程序,请在将连接字符串替换为前面复制的值后发出以下命令:To run the program, issue the following command after replacing the connection string with the value you copied earlier:

    java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "<SERVICE BUS NAMESPACE CONNECTION STRING>" 
    
  5. 观察 10 条消息发送到队列。Observe 10 messages being sent to the queue. 消息的顺序是不确定的,但可以看到消息发送,然后可以看到确认和被接收,此外还可以看到有效负载数据:Ordering of messages is not guaranteed, but you can see the messages sent, then acknowledged and received, along with the payload data:

    Message sending: Id = 0
    Message sending: Id = 1
    Message sending: Id = 2
    Message sending: Id = 3
    Message sending: Id = 4
    Message sending: Id = 5
    Message sending: Id = 6
    Message sending: Id = 7
    Message sending: Id = 8
    Message sending: Id = 9
            Message acknowledged: Id = 9
            Message acknowledged: Id = 3
                                    Message received:
                                                    MessageId = 9,
                                                    SequenceNumber = 54324670505156609,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:20.972Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:20.972Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Nikolaus, name = Kopernikus ]
    
            Message acknowledged: Id = 2
            Message acknowledged: Id = 5
            Message acknowledged: Id = 1
            Message acknowledged: Id = 8
            Message acknowledged: Id = 7
            Message acknowledged: Id = 0
            Message acknowledged: Id = 6
            Message acknowledged: Id = 4
                                    Message received:
                                                    MessageId = 3,
                                                    SequenceNumber = 58828270132527105,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:20.972Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:20.972Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Steven, name = Hawking ]
    
                                    Message received:
                                                    MessageId = 2,
                                                    SequenceNumber = 9288674231451649,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.012Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.012Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Marie, name = Curie ]
    
                                    Message received:
                                                    MessageId = 1,
                                                    SequenceNumber = 22799473113563137,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.025Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.025Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Werner, name = Heisenberg ]
    
                                    Message received:
                                                    MessageId = 8,
                                                    SequenceNumber = 67835469387268097,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.028Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.028Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Johannes, name = Kepler ]
    
                                    Message received:
                                                    MessageId = 7,
                                                    SequenceNumber = 4785074604081153,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.020Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.020Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Galileo, name = Galilei ]
    
                                    Message received:
                                                    MessageId = 5,
                                                    SequenceNumber = 13792273858822145,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.027Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.027Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Niels, name = Bohr ]
    
                                    Message received:
                                                    MessageId = 0,
                                                    SequenceNumber = 18295873486192641,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.021Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.021Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Albert, name = Einstein ]
    
                                    Message received:
                                                    MessageId = 6,
                                                    SequenceNumber = 281474976710657,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:21.019Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:21.019Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Michael, name = Faraday ]
    
                                    Message received:
                                                    MessageId = 4,
                                                    SequenceNumber = 63331869759897601,
                                                    EnqueuedTimeUtc = 2019-02-25T18:15:20.964Z,
                                                    ExpiresAtUtc = 2019-02-25T18:17:20.964Z,
                                                    ContentType = "application/json",
                                                    Content: [ firstName = Isaac, name = Newton ]
    

清理资源Clean up resources

在 Azure PowerShell 中,运行以下命令来删除资源组、命名空间和所有相关资源:In the Azure PowerShell, run the following command to remove the resource group, namespace, and all related resources:

az group delete --resource-group myResourceGroup

了解示例代码Understand the sample code

此部分详述了示例代码的重要节。This section contains more details about key sections of the sample code. 可以浏览此处的 GitHub 存储库提供的代码。You can browse the code, located in the GitHub repository here.

获取连接字符串Get connection string

runApp 方法将连接字符串值从参数读取到程序。The runApp method reads the connection string value from the arguments to the program.

public static void main(String[] args) {

    System.exit(runApp(args, (connectionString) -> {
        QueuesGettingStarted app = new QueuesGettingStarted();
        try {
            app.run(connectionString);
            return 0;
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 1;
        }
    }));
}

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }

创建用于发送和接收操作的队列客户端Create queue clients to send and receive

为了发送和接收消息,run() 方法创建队列客户端实例,这些实例是根据连接字符串和队列名称构建的。To send and receive messages, the run() method creates queue client instances, which are constructed from the connection string and the queue name. 以下代码创建两个队列客户端,一个用于发送,一个用于接收:This code creates two queue clients, one each for sending and receiving:

public void run(String connectionString) throws Exception {

    // Create a QueueClient instance for receiving using the connection string builder
    // We set the receive mode to "PeekLock", meaning the message is delivered
    // under a lock and must be acknowledged ("completed") to be removed from the queue
    QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
    // We are using single thread executor as we are only processing one message at a time
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    this.registerReceiver(receiveClient, executorService);

    // Create a QueueClient instance for sending and then asynchronously send messages.
    // Close the sender once the send operation is complete.
    QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
    this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());

    // wait for ENTER or 10 seconds elapsing
    waitForEnter(10);

    // shut down receiver to close the receive loop
    receiveClient.close();
    executorService.shutdown();
}

构造和发送消息Construct and send messages

sendMessagesAsync() 方法创建一组(10 条)消息,并使用队列客户端以异步方式进行发送:The sendMessagesAsync() method creates a set of 10 messages and asynchronously sends them using the queue client:

CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
    List<HashMap<String, String>> data =
            GSON.fromJson(
                    "[" +
                            "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                            "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                            "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                            "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                            "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                            "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                            "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                            "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                            "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                            "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                            "]",
                    new TypeToken<List<HashMap<String, String>>>() {}.getType());

    List<CompletableFuture> tasks = new ArrayList<>();
    for (int i = 0; i < data.size(); i++) {
        final String messageId = Integer.toString(i);
        Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
        message.setContentType("application/json");
        message.setLabel("Scientist");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(2));
        System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
        tasks.add(
                sendClient.sendAsync(message).thenRunAsync(() -> {
                    System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                }));
    }
    return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}

接收消息Receive messages

registerReceiver() 方法注册 RegisterMessageHandler 回调,并设置某些消息处理程序选项:The registerReceiver() method registers the RegisterMessageHandler callback and also sets some message handler options:

void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {

    
    // register the RegisterMessageHandler callback with executor service
    queueClient.registerMessageHandler(new IMessageHandler() {
                                            // callback invoked when the message handler loop has obtained a message
                                            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                                                // receives message is passed to callback
                                                if (message.getLabel() != null &&
                                                        message.getContentType() != null &&
                                                        message.getLabel().contentEquals("Scientist") &&
                                                        message.getContentType().contentEquals("application/json")) {

                                                    byte[] body = message.getBody();
                                                    Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                                                    System.out.printf(
                                                            "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                                                                    "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                                                            message.getMessageId(),
                                                            message.getSequenceNumber(),
                                                            message.getEnqueuedTimeUtc(),
                                                            message.getExpiresAtUtc(),
                                                            message.getContentType(),
                                                            scientist != null ? scientist.get("firstName") : "",
                                                            scientist != null ? scientist.get("name") : "");
                                                }
                                                return CompletableFuture.completedFuture(null);
                                            }

                                            // callback invoked when the message handler has an exception to report
                                            public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                                                System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                                            }
                                        },
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, true, Duration.ofMinutes(1)),
            executorService);

}

Note

可以使用服务总线资源管理器管理服务总线资源。You can manage Service Bus resources with Service Bus Explorer. 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs.

后续步骤Next steps

本文介绍了如何创建一个服务总线命名空间并从队列发送和接收消息所需的其他资源。In this article, you created a Service Bus namespace and other resources required to send and receive messages from a queue. 若要详细了解如何编写收发消息的代码,请继续阅读教程的“发送和接收消息”部分。To learn more about writing code to send and receive messages, continue to the tutorials in the Send and receive messages section.