消息复制任务和应用程序

消息复制和跨区域联合一文中所述,在服务总线实体对之间以及服务总线与其他消息源和目标之间的消息序列复制通常依赖于 Azure Functions。

Azure Functions 是一种可缩放且可靠的执行环境,用于配置和运行无服务器应用程序,包括消息复制和联合任务。

在本概述中,你将了解 Azure Functions 针对此类应用程序提供的内置功能、可为转换任务调整和修改的代码块,以及如何配置 Azure Functions 应用程序,使其与服务总线和其他 Azure 消息传送服务完美集成。 有关更多详细信息,本文将指向 Azure Functions 文档。

什么是复制任务?

复制任务是从源接收事件并将其转发到目标。 大多数复制任务会不作任何更改地转发事件,如果源协议和目标协议不同,最多会在元数据结构之间执行映射。

复制任务通常是无状态的,这意味着它们在按顺序或并行执行任务中不会共享状态或其他附带后果。 批处理和链接也是如此,它们都可在流的现有状态的基础上实现。

这使得复制任务不同于聚合任务,聚合任务通常是有状态的,属于分析框架和服务(如 Azure 流分析)的领域。

Azure Functions 中的复制应用程序和任务

在 Azure Functions 中,复制任务是使用一个从配置的源获取输入消息的触发器,以及一个将从源复制的消息转发到配置的目标的输出绑定来实现的。

复制任务通过与用于任何其他 Azure Functions 应用程序的相同部署方法部署到复制应用程序中。 可以将多个任务配置到同一个应用程序中。

使用 Azure Functions Premium,多个复制应用程序可以共享相同的基础资源池(称为应用服务计划)。 这意味着可以轻松地并置复制任务,例如将用 .NET 编写的复制任务与用 Java 编写的复制任务并置。 如果你想要利用仅适用于 Java 的特定库(例如 Apache Camel),并且这些库是特定集成路径的最佳选择,则这种方便性就非常重要,即使你一般偏向于使用其他语言和运行时执行其他复制任务,也是如此。

在任何可用的情况下,都应优先使用面向批处理的触发器,而不是传递单个事件或消息的触发器,并且应始终获取完整的事件或消息结构,而不是依赖于 Azure 函数的参数绑定表达式

函数的名称应反映正在连接的源和目标对,你应该在应用程序配置文件中的连接字符串或其他配置元素的引用前加上该名称。

数据和元数据映射

确定一对输入触发器和输出绑定后,在不同的事件类型或消息类型之间必须执行映射,除非触发器的类型和输出是相同的。

对于在事件中心和服务总线之间复制消息的简单复制任务,你无需自己编写代码,可以依靠提供了复制示例的实用工具库

重试策略

为了避免在可用性事件期间在复制函数的任何一侧丢失数据,需要配置重试策略以获得可靠性。 请参阅有关重试的 Azure Functions 文档,以配置重试策略。

样本存储库中的示例项目选择的策略设置配置了指数退避策略,重试间隔从 5 秒到 15 分钟,并无限重试,以避免数据丢失。

Azure Functions 重试策略的正式发布 (GA) 版本仅支持事件中心和计时器触发器。 已删除对其他触发器的预览支持。

设置复制应用程序主机

复制应用程序是一个或多个复制任务的执行主机。

它是一个 Azure Functions 应用程序,配置为按消耗计划或 Azure Functions 高级计划(推荐)运行。 所有复制应用程序都必须在系统分配或用户分配的托管标识下运行。

链接的 Azure 资源管理器 (ARM) 模板使用以下项创建和配置复制应用程序:

  • 用于跟踪复制进度和日志的 Azure 存储帐户。
  • 系统分配的托管标识。
  • 用于监视的 Azure Monitor 与 Application Insights 集成。

事件中心如果必须访问绑定到 Azure 虚拟网络 (VNet) 的事件中心,则必须使用 Azure Functions 高级计划并配置为附加到同一个 VNet,这也是可用选项之一。

部署 可视化
Azure Functions 消耗计划 部署到 Azure 可视化
Azure Functions 高级计划 部署到 Azure 可视化
VNet 的 Azure Functions 高级计划 部署到 Azure 可视化

示例

样本存储库包含多个在事件中心和/或服务总线实体之间复制事件的复制任务示例。

若要在事件中心之间复制事件数据,请配合使用事件中心触发器和事件中心输出绑定:

[FunctionName("telemetry")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static Task Telemetry(
    [EventHubTrigger("telemetry", ConsumerGroup = "$USER_FUNCTIONS_APP_NAME.telemetry", Connection = "telemetry-source-connection")] EventData[] input,
    [EventHub("telemetry-copy", Connection = "telemetry-target-connection")] EventHubClient outputClient,
    ILogger log)
{
    return EventHubReplicationTasks.ForwardToEventHub(input, outputClient, log);
}

若要在服务总线实体之间复制消息,请使用服务总线触发器和输出绑定:

[FunctionName("jobs-transfer")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static Task JobsTransfer(
    [ServiceBusTrigger("jobs-transfer", Connection = "jobs-transfer-source-connection")] Message[] input,
    [ServiceBus("jobs", Connection = "jobs-target-connection")] IAsyncCollector<Message> output,
    ILogger log)
{
    return ServiceBusReplicationTasks.ForwardToServiceBus(input, output, log);
}

利用帮助程序方法,可以轻松地在事件中心和服务总线之间进行复制:

Source 目标 入口点
事件中心 事件中心 Azure.Messaging.Replication.EventHubReplicationTasks.ForwardToEventHub
事件中心 服务总线 Azure.Messaging.Replication.EventHubReplicationTasks.ForwardToServiceBus
服务总线 事件中心 Azure.Messaging.Replication.ServiceBusReplicationTasks.ForwardToEventHub
服务总线 服务总线 Azure.Messaging.Replication.ServiceBusReplicationTasks.ForwardToServiceBus

监视

要了解如何监视复制应用,请参阅 Azure Functions 文档的“监视”部分

用于监视复制任务的一个特别有用的可视化工具是 Application Insights 应用程序映射,它根据捕获的监视信息自动生成,并允许浏览复制任务的源和目标传输的可靠性和性能。

若要获得即时诊断见解,可以使用实时指标门户工具,该工具可以提供低延迟的日志详细信息可视化效果。

后续步骤