快速入门:使用 Go 将事件发送到事件中心或从其接收事件

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。 有关事件中心的详细概述,请参阅事件中心概述事件中心功能

本快速入门介绍如何编写用于向/从事件中心发送/接收事件的 Go 应用程序。

注意

本快速入门基于 GitHub 上的示例,位于 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs。 用于发送事件的代码基于 example_producing_events_test.go 示例,用于接收事件的代码基于 example_processor_test.go 示例。 本快速入门的代码已简化,所有详细注释均已删除,因此请查看示例以获取更多详细信息和说明。

先决条件

若要完成本快速入门,需要具备以下先决条件:

  • 已本地安装 Go。 若有必要,请按照以下说明操作
  • 有效的 Azure 帐户。 如果没有 Azure 订阅,请在开始前创建一个试用版订阅
  • 创建事件中心命名空间和事件中心。 使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 要创建命名空间和事件中心,请按照此文中的步骤操作。

发送事件

本部分展示了如何创建 Go 应用程序来将事件发送到事件中心。

安装 Go 包

获取事件中心的 Go 包,如以下示例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

用于将事件发送到事件中心的代码

下面是用于将事件发送到事件中心的代码。 代码中的主要步骤包括:

  1. 使用事件中心命名空间和事件中心名称的连接字符串创建事件中心生成者客户端。
  2. 创建批处理对象并将示例事件添加到批。
  3. 向事件发送事件批。

重要

NAMESPACE CONNECTION STRING 替换为事件中心命名空间的连接字符串,将 EVENT HUB NAME 替换成示例代码中的事件中心名称。

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)
	}

	// send the batch of events to the event hub
	producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

暂时不要运行应用程序。 首先需要运行接收方应用,然后运行发送方应用。

接收事件

创建存储帐户和容器

使用 Azure 存储容器在接收器之间共享分区租用和事件流中的检查点等状态。 可以使用 Go SDK 创建存储帐户和容器,但也可以按照关于 Azure 存储帐户中的说明进行创建。

Go 包

若要接收消息,请获取事件中心的 Go 包,如以下示例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

用于从事件中心接收事件的代码

下面是用于从事件中心接收事件的代码。 代码中的主要步骤包括:

  1. 检查检查点存储对象,该对象表示事件中心用于检查点的 Azure Blob 存储。
  2. 使用事件中心命名空间和事件中心名称的连接字符串创建事件中心使用者客户端。
  3. 使用客户端对象和检查点存储对象创建事件处理器。 处理器接收和处理事件。
  4. 对于事件中心中的每个分区,创建一个分区客户端,将 processEvents 作为处理事件的函数。
  5. 运行所有分区客户端以接收和处理事件。

重要

将以下占位符值替换为实际值:

  • AZURE STORAGE CONNECTION STRING 替换为 Azure 存储帐户的连接字符串
  • BLOB CONTAINER NAME 替换为在存储帐户中创建的 Blob 容器的名称
  • NAMESPACE CONNECTION STRING 替换为事件中心命名空间的连接字符串
  • EVENT HUB NAME 替换为示例代码中的事件中心名称。
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStoreFromConnectionString("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

运行接收方和发送方应用

  1. 首先运行接收方应用。

  2. 运行发送方应用。

  3. 等待一分钟,可在接收方窗口中看到以下输出。

     Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

后续步骤

查看 GitHub 上的示例,位于 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs