向/从 Azure 服务总线队列发送/接收消息 (Go)

本教程介绍如何使用 Go 编程语言向 Azure 服务总线队列发送消息和从 Azure 服务总线队列接收消息。

Azure 服务总线是一个完全托管的企业消息代理,其中包含消息队列和发布/订阅功能。 服务总线用于将应用程序和服务彼此分离,提供分布式、可靠和高性能的消息传输。

Azure SDK for Go 的 azservicebus 包支持使用 Go 编程语言向/从 Azure 服务总线发送/接收消息。

完成本教程后,你将能够:将单个消息或批量消息发送到队列、接收消息和将未处理的消息加入死信队列。

先决条件

创建示例应用

首先,创建新的 Go 模块。

  1. 为名为 service-bus-go-how-to-use-queues 的模块创建一个新目录。

  2. azservicebus 目录中,初始化模块并安装所需包。

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. 创建名为 main.go 的新文件。

进行身份验证并创建客户端

main.go 文件中,创建一个名为 GetClient 的新函数并添加以下代码:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.chinacloudapi.cn
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

GetClient 函数返回使用 Azure 服务总线命名空间和凭据创建的新 azservicebus.Client 对象。 该命名空间由 AZURE_SERVICEBUS_HOSTNAME 环境变量提供。 并且该凭据使用 azidentity.NewDefaultAzureCredential 函数创建。

对于本地开发,DefaultAzureCredential 使用来自 Azure CLI 的访问令牌(可以通过运行 az login 命令创建)来向 Azure 进行身份验证。

提示

若要使用连接字符串进行身份验证,请使用 NewClientFromConnectionString 函数。

向队列发送消息

main.go 文件中,创建一个名为 SendMessage 的新函数并添加以下代码:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage 采用两个参数:消息字符串和 azservicebus.Client 对象。 然后,它会创建一个新的 azservicebus.Sender 对象,并将消息发送到队列。 若要发送批量消息,请将 SendMessageBatch 函数添加到 main.go 文件。

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch 采用两个参数:消息切片和 azservicebus.Client 对象。 然后,它会创建一个新的 azservicebus.Sender 对象,并将消息发送到队列。

从队列接收消息

将消息发送到队列后,可以使用 azservicebus.Receiver 类型进行接收。 若要从队列接收消息,请将 GetMessage 函数添加到 main.go 文件。

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage 使用 azservicebus.Client 对象并创建新的 azservicebus.Receiver 对象。 然后,它会从队列接收消息。 Receiver.ReceiveMessages 函数采用两个参数:上下文和要接收的消息数。 Receiver.ReceiveMessages 函数返回 azservicebus.ReceivedMessage 对象的切片。

接下来,for 循环会循环访问消息并打印消息正文。 然后调用 CompleteMessage 函数完成消息,将其从队列中移除。

超过长度限制、发送到无效队列或未成功处理的消息可以发送到死信队列。 若要将消息发送到死信队列,请将 SendDeadLetterMessage 函数添加到 main.go 文件。

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage 采用 azservicebus.Client 对象和 azservicebus.ReceivedMessage 对象。 然后它将消息发送到死信队列。 该函数采用两个参数:上下文和 azservicebus.DeadLetterOptions 对象。 如果消息无法发送到死信队列,Receiver.DeadLetterMessage 函数将返回错误。

若要从死信队列接收消息,请将 ReceiveDeadLetterMessage 函数添加到 main.go 文件。

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage 采用 azservicebus.Client 对象并创建包含死信队列选项的新的 azservicebus.Receiver 对象。 然后它从死信队列接收消息。 然后该函数从死信队列接收一条消息。 接着它打印该消息的死信原因和描述。

代码示例

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.chinacloudapi.cn
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

运行代码

在运行代码前,请创建名为 AZURE_SERVICEBUS_HOSTNAME 的环境变量。 将环境变量的值设置为服务总线命名空间。

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

接下来,运行以下 go run 命令来运行应用:

go run main.go

后续步骤

有关详细信息,请查看以下链接: