快速入门:使用 Go 将事件发送到事件中心或从其接收事件Quickstart: Send events to or receive events from Event Hubs using Go

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. 有关事件中心的详细概述,请参阅事件中心概述事件中心功能For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

本教程介绍了如何编写 Go 应用程序来将事件发送到事件中心或从其接收事件。This tutorial describes how to write Go applications to send events to or receive events from an event hub.

备注

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. 或者,可以按照本教程中的步骤创建自己的解决方案。Alternatively, you can follow the steps in this tutorial to create your own.

先决条件Prerequisites

若要完成本教程,需要满足以下先决条件:To complete this tutorial, you need the following prerequisites:

  • 已本地安装 Go。Go installed locally. 若有必要,请按照以下说明操作Follow these instructions if necessary.
  • 有效的 Azure 帐户。An active Azure account. 如果没有 Azure 订阅,请在开始前创建一个[试用帐户][]。If you don't have an Azure subscription, create a [trial account][] before you begin.
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。Use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article.

发送事件Send events

本部分展示了如何创建 Go 应用程序来将事件发送到事件中心。This section shows you how to create a Go application to send events to an event hub.

安装 Go 包Install Go package

使用 go getdep 获取事件中心的 Go 包。Get the Go package for Event Hubs with go get or dep. 例如:For example:

go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go

在代码文件中导入包Import packages in your code file

若要导入 Go 包,请使用下面的代码示例:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
)

创建服务主体Create service principal

按照使用 Azure CLI 2.0 创建 Azure 服务主体中的说明创建新的服务主体。Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. 使用以下名称将提供的凭据保存在环境中。Save the provided credentials in your environment with the following names. 同时预先配置 Azure SDK for Go 和事件中心包,查找以下变量名称:Both the Azure SDK for Go and the Event Hubs packages are preconfigured to look for these variable names:

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

现在,为使用这些凭据的事件中心客户端创建授权提供程序:Now, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

创建事件中心客户端Create Event Hubs client

以下代码创建事件中心客户端:The following code creates an Event Hubs client:

hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
    log.Fatalf("failed to get hub %s\n", err)
}

编写发送消息的代码Write code to send messages

在以下片段中,使用 (1) 从终端以交互方式发送消息或使用 (2) 在程序中发送消息:In the following snippet, use (1) to send messages interactively from a terminal, or (2) to send messages within your program:

// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
    fmt.Printf("Input a message to send: ")
    text, _ := reader.ReadString('\n')
    hub.Send(ctx, eventhubs.NewEventFromString(text))
}

// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))

附加信息Extras

获取事件中心内的分区 ID:Get the IDs of the partitions in your event hub:

info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
    log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)

运行应用程序将事件发送到事件中心。Run the application to send events to the event hub.

祝贺!Congratulations! 现在已向事件中心发送消息。You have now sent messages to an event hub.

接收事件Receive events

创建存储帐户和容器Create a Storage account and container

使用 Azure 存储容器在接收器之间共享分区租用和事件流中的检查点等状态。State such as leases on partitions and checkpoints in the event stream are shared between receivers using an Azure Storage container. 可以使用 Go SDK 创建存储帐户和容器,但也可以按照关于 Azure 存储帐户中的说明进行创建。You can create a storage account and container with the Go SDK, but you can also create one by following the instructions in About Azure storage accounts.

Go 示例存储库中和教程对应的示例中提供了使用 Go SDK 创建存储项目的示例。Samples for creating Storage artifacts with the Go SDK are available in the Go samples repo and in the sample corresponding to this tutorial.

Go 包Go packages

若要接收消息,请使用 go getdep 获取事件中心的 Go 包:To receive the messages, get the Go packages for Event Hubs with go get or dep:

go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest

在代码文件中导入包Import packages in your code file

若要导入 Go 包,请使用下面的代码示例:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
    eph "github.com/Azure/azure-event-hubs-go/eph"
    storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
    azure "github.com/Azure/go-autorest/autorest/azure"
)

创建服务主体Create service principal

按照使用 Azure CLI 2.0 创建 Azure 服务主体中的说明创建新的服务主体。Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. 使用以下名称将提供的凭据保存在环境中:预先配置了 Azure SDK for Go 和事件中心包,以便查找这些变量名称。Save the provided credentials in your environment with the following names: Both Azure SDK for Go and Event Hubs package are preconfigured to look for these variable names.

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

接下来,为使用这些凭据的事件中心客户端创建授权提供程序:Next, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

获取元数据结构Get metadata struct

使用 Azure Go SDK 获取 Azure 环境相关元数据的结构。Get a struct with metadata about your Azure environment using the Azure Go SDK. 稍后的操作将使用此结构来查找正确的终结点。Later operations use this struct to find correct endpoints.

azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
    log.Fatalf("could not get azure.Environment struct: %s\n", err)
}

创建凭据帮助程序Create credential helper

创建使用上述 Azure Active Directory (AAD) 凭据的凭据帮助程序,为存储创建共享访问签名 (SAS) 凭据。Create a credential helper that uses the previous Azure Active Directory (AAD) credentials to create a Shared Access Signature (SAS) credential for Storage. 最后一个参数指示构造函数使用与之前所用相同的环境变量:The last parameter tells this constructor to use the same environment variables as used previously:

cred, err := storageLeaser.NewAADSASCredential(
    subscriptionID,
    resourceGroupName,
    storageAccountName,
    storageContainerName,
    storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
    log.Fatalf("could not prepare a storage credential: %s\n", err)
}

创建检查指针和出租人Create a check pointer and a leaser

创建“出租人”,负责将分区租给特定接收器;创建检查指针,负责编写消息流的检查点,以便其他接收器可开始读取正确的偏移量 。Create a leaser, responsible for leasing a partition to a particular receiver, and a check pointer, responsible for writing checkpoints for the message stream so that other receivers can begin reading from the correct offset.

目前,提供使用相同存储容器的单个 StorageLeaserCheckpointer 来管理租用和检查点。Currently, a single StorageLeaserCheckpointer is available that uses the same Storage container to manage both leases and checkpoints. 除存储帐户和容器名称外,StorageLeaserCheckpointer 还需要上一步中创建的凭据和 Azure 结构,才能正确访问容器。In addition to the storage account and container names, the StorageLeaserCheckpointer needs the credential created in the previous step and the Azure environment struct to correctly access the container.

leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
    cred,
    storageAccountName,
    storageContainerName,
    azureEnv)
if err != nil {
    log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}

构造事件处理器主机Construct Event Processor Host

现已拥有构造 EventProcessorHost 所需的部分,如下所示。You now have the pieces needed to construct an EventProcessorHost, as follows. 如之前所述,同一 StorageLeaserCheckpointer 同时用作出租人和检查指针:The same StorageLeaserCheckpointer is used as both a leaser and check pointer, as described previously:

ctx := context.Background()
p, err := eph.New(
    ctx,
    nsName,
    hubName,
    tokenProvider,
    leaserCheckpointer,
    leaserCheckpointer)
if err != nil {
    log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())

创建处理程序Create handler

现在创建一个处理程序并将其注册到事件处理器主机。Now create a handler and register it with the Event Processor Host. 启动主机时,它将向传入消息应用此设置和其他任何指定的处理程序:When the host is started, it applies this and any other specified handlers to incoming messages:

handler := func(ctx context.Context, event *eventhubs.Event) error {
    fmt.Printf("received: %s\n", string(event.Data))
    return nil
}

// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
    log.Fatalf("failed to register handler: %s\n", err)
}

编写接收消息的代码Write code to receive messages

一切就绪后,便可使用 Start(context) 启动事件处理器主机,使其永久运行,或使用 StartNonBlocking(context) 使其仅在消息可用时运行。With everything set up, you can start the Event Processor Host with Start(context) to keep it permanently running, or with StartNonBlocking(context) to run only as long as messages are available.

本教程将如下启动并运行,请参阅 GitHub 示例,了解使用 StartNonBlocking 的示例:This tutorial starts and runs as follows; see the GitHub sample for an example using StartNonBlocking:

ctx := context.Background()
err = p.Start()
if err != nil {
    log.Fatalf("failed to start EPH: %s\n", err)
}

后续步骤Next steps

请阅读以下文章:Read the following articles:

试用帐户trial account.