向/从 Azure 服务总线队列发送/接收消息 (Go)
本教程介绍如何使用 Go 编程语言向 Azure 服务总线队列发送消息和从 Azure 服务总线队列接收消息。
Azure 服务总线是一个完全托管的企业消息代理,其中包含消息队列和发布/订阅功能。 服务总线用于将应用程序和服务彼此分离,提供分布式、可靠和高性能的消息传输。
Azure SDK for Go 的 azservicebus 包支持使用 Go 编程语言向/从 Azure 服务总线发送/接收消息。
完成本教程后,你将能够:将单个消息或批量消息发送到队列、接收消息和将未处理的消息加入死信队列。
先决条件
- Azure 订阅。 可激活你的 Visual Studio 或 MSDN 订阅者权益,或者注册试用版订阅。
- 如果没有可使用的队列,请遵循使用 Azure 门户创建服务总线队列一文来创建队列。
- Go 1.18 或更高版本
创建示例应用
首先,创建新的 Go 模块。
为名为
service-bus-go-how-to-use-queues
的模块创建一个新目录。在
azservicebus
目录中,初始化模块并安装所需包。go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
创建名为
main.go
的新文件。
进行身份验证并创建客户端
在 main.go
文件中,创建一个名为 GetClient
的新函数并添加以下代码:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.chinacloudapi.cn
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
GetClient
函数返回使用 Azure 服务总线命名空间和凭据创建的新 azservicebus.Client
对象。 该命名空间由 AZURE_SERVICEBUS_HOSTNAME
环境变量提供。 并且该凭据使用 azidentity.NewDefaultAzureCredential
函数创建。
对于本地开发,DefaultAzureCredential
使用来自 Azure CLI 的访问令牌(可以通过运行 az login
命令创建)来向 Azure 进行身份验证。
提示
若要使用连接字符串进行身份验证,请使用 NewClientFromConnectionString 函数。
向队列发送消息
在 main.go
文件中,创建一个名为 SendMessage
的新函数并添加以下代码:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
采用两个参数:消息字符串和 azservicebus.Client
对象。 然后,它会创建一个新的 azservicebus.Sender
对象,并将消息发送到队列。 若要发送批量消息,请将 SendMessageBatch
函数添加到 main.go
文件。
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
采用两个参数:消息切片和 azservicebus.Client
对象。 然后,它会创建一个新的 azservicebus.Sender
对象,并将消息发送到队列。
从队列接收消息
将消息发送到队列后,可以使用 azservicebus.Receiver
类型进行接收。 若要从队列接收消息,请将 GetMessage
函数添加到 main.go
文件。
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
使用 azservicebus.Client
对象并创建新的 azservicebus.Receiver
对象。 然后,它会从队列接收消息。 Receiver.ReceiveMessages
函数采用两个参数:上下文和要接收的消息数。 Receiver.ReceiveMessages
函数返回 azservicebus.ReceivedMessage
对象的切片。
接下来,for
循环会循环访问消息并打印消息正文。 然后调用 CompleteMessage
函数完成消息,将其从队列中移除。
超过长度限制、发送到无效队列或未成功处理的消息可以发送到死信队列。 若要将消息发送到死信队列,请将 SendDeadLetterMessage
函数添加到 main.go
文件。
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
采用 azservicebus.Client
对象和 azservicebus.ReceivedMessage
对象。 然后它将消息发送到死信队列。 该函数采用两个参数:上下文和 azservicebus.DeadLetterOptions
对象。 如果消息无法发送到死信队列,Receiver.DeadLetterMessage
函数将返回错误。
若要从死信队列接收消息,请将 ReceiveDeadLetterMessage
函数添加到 main.go
文件。
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
采用 azservicebus.Client
对象并创建包含死信队列选项的新的 azservicebus.Receiver
对象。 然后它从死信队列接收消息。 然后该函数从死信队列接收一条消息。 接着它打印该消息的死信原因和描述。
代码示例
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.chinacloudapi.cn
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
运行代码
在运行代码前,请创建名为 AZURE_SERVICEBUS_HOSTNAME
的环境变量。 将环境变量的值设置为服务总线命名空间。
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
接下来,运行以下 go run
命令来运行应用:
go run main.go
后续步骤
有关详细信息,请查看以下链接: