快速入门:使用 Go 将事件发送到事件中心或从其接收事件

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。 有关事件中心的详细概述,请参阅事件中心概述事件中心功能

本快速入门介绍了如何编写 Go 应用程序来将事件发送到事件中心或从其接收事件。

注意

本快速入门基于网址为 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs 的 GitHub 上的示例。 发送事件部分基于 example_producing_events_test.go 示例,接收事件部分基于 example_processor_test.go 示例。 本快速入门的代码已简化,删除了所有详细注释,因此请查看示例以了解更多详细信息和说明。

先决条件

若要完成本快速入门,需要具备以下先决条件:

  • 已本地安装 Go。 若有必要,请按照以下说明操作
  • 有效的 Azure 帐户。 如果没有 Azure 订阅,请在开始前创建一个试用版订阅
  • 创建事件中心命名空间和事件中心。 使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 要创建命名空间和事件中心,请按照此文中的步骤操作。

发送事件

本部分展示了如何创建 Go 应用程序来将事件发送到事件中心。

安装 Go 包

获取事件中心的 Go 包,如以下示例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

用于向事件中心发送事件的代码

下面是用于向事件中心发送事件的代码。 代码中的主要步骤包括:

  1. 使用事件中心命名空间的连接字符串和事件中心名称创建事件中心生成者客户端。
  2. 创建批处理对象并将示例事件添加到批处理。
  3. 向事件中心发送事件批。

重要

NAMESPACE CONNECTION STRING 替换为事件中心命名空间的连接字符串,并将 EVENT HUB NAME 替换为示例代码中的事件中心名称。

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

目前请勿运行应用程序。 首先需要运行接收方应用,然后再运行发送方应用。

接收事件

创建存储帐户和容器

使用 Azure 存储容器在接收器之间共享分区租用和事件中的检查点等状态。 可以使用 Go SDK 创建存储帐户和容器,但也可以按照关于 Azure 存储帐户中的说明进行创建。

使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:

  • 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
  • 请勿将容器用于任何其他用途,也不要将存储帐户用于任何其他用途。
  • 存储帐户应位于部署的应用程序所在的同一区域中。 如果应用程序位于本地,请尝试选择最近的区域。

在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。

  • 分层命名空间
  • Blob 软删除
  • 版本控制

Go 包

若要接收消息,请获取事件中心的 Go 包,如以下示例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

用于从事件中心接收事件的代码

下面是用于从事件中心接收事件的代码。 代码中的主要步骤包括:

  1. 检查一个检查点存储对象,该对象表示由事件中心用于检查点操作的 Azure Blob 存储。
  2. 使用事件中心命名空间的连接字符串和事件中心名称创建事件中心使用者客户端。
  3. 使用客户端对象和检查点存储对象创建事件处理程序。 处理程序可接收和处理事件。
  4. 对于事件中心的每个分区,请创建一个分区客户端,将 processEvents 用作处理事件的函数。
  5. 运行所有分区客户端以接收和处理事件。

重要

将以下占位符值替换为实际值:

  • AZURE STORAGE CONNECTION STRING 替换为 Azure 存储帐户的连接字符串
  • BLOB CONTAINER NAME 替换为已在存储帐户中创建的 blob 容器的名称
  • NAMESPACE CONNECTION STRING 替换为事件中心命名空间的连接字符串
  • EVENT HUB NAME 替换为示例代码中的事件中心名称。
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

运行接收方和发送方应用

  1. 先运行接收方应用。

  2. 运行发送方应用。

  3. 等待一分钟,然后就会在接收方窗口中看到以下输出。

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

后续步骤

查看位于 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs 网站的 GitHub 上的示例。