如何使用 Azure WebJobs SDK 进行事件驱动的后台处理

本文提供有关如何为 Azure WebJobs SDK 编写代码的指导。 除非另有说明,否则本文档适用于版本 2.x 和 3.x。 3.x 中引入的主要更改是使用 .NET Core 而不是 .NET Framework。

Note

Azure Functions 是基于 WebJobs SDK 构建的,本文链接到某些主题的 Azure Functions 文档。 注意 Functions 与 WebJobs SDK 之间的以下差异:

  • Azure Functions 版本 1.x 对应于 WebJobs SDK 版本 2.x,Azure Functions 2.x 对应于 WebJobs SDK 3.x。 源代码存储库遵循 WebJobs SDK 编号方案,许多存储库具有 v2.x 分支,其中的主分支当前包含 3.x 代码。
  • Azure Functions C# 类库的示例代码类似于 WebJobs SDK 代码,不过,在 WebJobs SDK 项目中,无需指定 FunctionName 特性。
  • 某些绑定类型(例如 HTTP、Webhook 以及基于 HTTP 的事件网格)只在 Functions 中受支持。

先决条件

本文假设你已阅读 WebJobs SDK 入门

JobHost

JobHost 对象是函数的运行时容器:它侦听触发器并调用函数。 在代码中创建 JobHost,并编写代码来自定义其行为。

这是直接使用 WebJobs SDK 与通过 Azure Functions 间接使用它的主要差别。 在 Azure Functions 中,服务控制 JobHost,你无法通过编写代码来定义它。 Azure Functions 允许通过 host.json 文件中的设置自定义主机行为。 这些设置是字符串而不是代码,限制可执行的自定义类型。

JobHost 连接字符串

在本地运行时,WebJobs SDK 在 local.settings.json 中查找存储和服务总线连接字符串;在 Azure 中运行时,它会在 WebJob 的环境中查找这些字符串。 如果想要对这些连接字符串使用自己的名称,或将其存储在其他位置,可以在代码中进行设置,如下所示:

static void Main(string[] args)
{
    var _storageConn = ConfigurationManager
        .ConnectionStrings["MyStorageConnection"].ConnectionString;

    var _dashboardConn = ConfigurationManager
        .ConnectionStrings["MyDashboardConnection"].ConnectionString;

    JobHostConfiguration config = new JobHostConfiguration();
    config.StorageConnectionString = _storageConn;
    config.DashboardConnectionString = _dashboardConn;
    JobHost host = new JobHost(config);
    host.RunAndBlock();
}

JobHost 开发设置

JobHostConfiguration 类包含 UseDevelopmentSettings 方法,调用该方法可以提高本地开发的效率。 下面是此方法更改的某些设置:

属性 开发设置
Tracing.ConsoleLevel TraceLevel.Verbose:最大化日志输出。
Queues.MaxPollingInterval 使用较小的值可确保立即触发队列方法。
Singleton.ListenerLockPeriod 使用 15 秒值有助于实现快速迭代开发。

以下示例演示如何使用开发设置。 若要使 config.IsDevelopment 在本地运行时返回 true,请设置名为 AzureWebJobsEnv、值为 Development 的本地环境变量。

static void Main()
{
    config = new JobHostConfiguration();

    if (config.IsDevelopment)
    {
        config.UseDevelopmentSettings();
    }

    var host = new JobHost(config);
    host.RunAndBlock();
}

JobHost ServicePointManager 设置

.NET Framework 包含名为 ServicePointManager.DefaultConnectionLimit 的 API,该 API 控制主机的并发连接数。 我们建议在启动 WebJobs 主机之前,在默认值 2 的基础上增大此值。

使用 HttpClient 从某个函数发出的所有传出 HTTP 请求都会流经 ServicePointManager。 达到 DefaultConnectionLimit 后,ServicePointManager 会开始将请求排队,然后再发送请求。 假设 DefaultConnectionLimit 设置为 2,并且代码发出了 1,000 个 HTTP 请求。 最初,实际只允许 2 个请求传入 OS。 其他 998 个请求将会排队,直到有可用的空间。 这意味着 HttpClient 可能会超时,因为它认为已发出请求,但是,OS 从未将此请求发送到目标服务器。 因此,可能会出现看似不合理的行为:本地 HttpClient 花费了 10 秒来完成请求,但服务在 200 毫秒内就返回了每个请求。

ASP.NET 应用程序的默认值是 Int32.MaxValue,这可能非常适合在“基本”或更高级别应用服务计划中运行的 WebJobs。 WebJobs 通常需要 Always On 设置,该设置仅受“基本”和更高级别应用服务计划的支持。

如果 WebJob 在“免费”或“共享”应用服务计划中运行,则应用程序会受到应用服务沙盒的限制:当前的连接限制为 300 个。 如果在 ServicePointManager 中指定无限制的连接数,则很有可能会达到沙盒连接阈值,并且站点将会关闭。 在这种情况下,将 DefaultConnectionLimit 设置为更小的值(例如 50 或 100)可以防止此问题发生,同时仍可保持足够的吞吐量。

必须在发出任何 HTTP 请求之前配置该设置。 为此,WebJobs 主机不应自动尝试调整该设置;在主机启动之前可能已发生 HTTP 请求,因而可能导致意外的行为。 最佳的做法是在初始化 JobHost 之前,立即在 Main 方法中设置值,如以下示例中所示

static void Main(string[] args)
{
    // Set this immediately so that it is used by all requests.
    ServicePointManager.DefaultConnectionLimit = Int32.MaxValue;

    var host = new JobHost();
    host.RunAndBlock();
}

触发器

函数必须是公共方法,并且必须包含一个触发器特性或 NoAutomaticTrigger 特性。

自动触发器

自动触发器调用函数来响应事件。 有关示例,请参阅入门文章中的“队列触发器”。

手动触发器

若要手动触发某个函数,请使用 NoAutomaticTrigger 特性,如以下示例中所示:

static void Main(string[] args)
{
    JobHost host = new JobHost();
    host.Call(typeof(Program).GetMethod("CreateQueueMessage"), new { value = "Hello world!" });
}
[NoAutomaticTrigger]
public static void CreateQueueMessage(
    TextWriter logger,
    string value,
    [Queue("outputqueue")] out string message)
{
    message = value;
    logger.WriteLine("Creating queue message: ", message);
}

输入和输出绑定

通过输入绑定能够以声明方式将 Azure 或第三方服务中的数据提供给代码使用。 输出绑定提供更新数据的方式。 入门文章中演示了输入和输出绑定的示例。

通过将属性应用于方法返回值,可以对输出绑定使用方法返回值。 请参阅 Azure Functions 触发器和绑定一文中的示例。

绑定类型

以下触发器和绑定类型包含在 Microsoft.Azure.WebJobs 包中:

  • Blob 存储
  • 队列存储
  • 表存储

若要使用其他触发器和绑定类型,请安装包含这些类型的 NuGet 包,并对 JobHostConfiguration 对象调用 Use<binding> 方法。 例如,若要使用 Timer 触发器,请安装 Microsoft.Azure.WebJobs.Extensions 并在 Main 方法中调用 UseTimers,如以下示例所示:

static void Main()
{
    config = new JobHostConfiguration();
    config.UseTimers();
    var host = new JobHost(config);
    host.RunAndBlock();
}

可以在特定绑定类型的 Azure Functions 参考文章的“包”部分中找到要为该绑定类型安装的包。 异常是 Files 触发器和绑定(适用于本地文件系统),不受 Azure Functions 的支持。 若要使用 Files 绑定,请安装 Microsoft.Azure.WebJobs.Extensions 并调用 UseFiles

UseCore

前面所述的 Microsoft.Azure.WebJobs.Extensions 包还提供了一个可以通过调用 UseCore 方法注册的特殊绑定类型。 使用此绑定可以在函数签名中定义 ExecutionContext 参数。 通过上下文对象可以访问调用 ID,使用该 ID 可以关联给定函数调用生成的所有日志。 下面是一个示例:

class Program
{
    static void Main()
    {
        config = new JobHostConfiguration();
        config.UseCore();
        var host = new JobHost(config);
        host.RunAndBlock();
    }
}
public class Functions
{
    public static void ProcessQueueMessage([QueueTrigger("queue")] string message,
        ExecutionContext executionContext,
        ILogger logger)
    {
        logger.LogInformation($"{message}\n{executionContext.InvocationId}");
    }
}

绑定配置

许多触发器和绑定类型允许配置其行为,配置方法是在传入到 JobHost 的配置对象中设置属性。

队列触发器配置

Azure Functions host.json 参考中介绍了可为存储队列触发器配置的设置。 以下示例演示了如何在 WebJobs SDK 项目中设置这些属性:

static void Main(string[] args)
{
    JobHostConfiguration config = new JobHostConfiguration();
    config.Queues.BatchSize = 8;
    config.Queues.NewBatchThreshold = 4;
    config.Queues.MaxDequeueCount = 4;
    config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(15);
    JobHost host = new JobHost(config);
    host.RunAndBlock();
}

其他绑定的配置

某些触发器和绑定类型定义其自身的自定义配置类型。 例如,File 触发器允许指定要监视的根路径:

static void Main()
{
    config = new JobHostConfiguration();
    var filesConfig = new FilesConfiguration
    {
        RootPath = @"c:\data\import"
    };
    config.UseFiles(filesConfig);
    var host = new JobHost(config);
    host.RunAndBlock();
}

绑定表达式

在特性构造函数参数中,可以使用解析为来自各种源的值的表达式。 例如,在以下代码中,BlobTrigger 特性的路径创建名为 filename 表达式。 用于输出绑定时,filename 解析为触发 Blob 的名称。

public static void CreateThumbnail(
    [BlobTrigger("sample-images/{filename}")] Stream image,
    [Blob("sample-images-sm/{filename}", FileAccess.Write)] Stream imageSmall,
    string filename,
    ILogger logger)
{
    logger.Info($"Blob trigger processing: {filename}");
    // ...
}

有关绑定表达式的详细信息,请参阅 Azure Functions 文档中的绑定表达式和模式

自定义绑定表达式

有时,你想要在代码中指定队列名称、Blob 名称、容器或表名称,而不是进行硬编码。 例如,可能要在配置文件或环境变量中指定 QueueTrigger 特性的队列名称。

为此,可以向 JobHostConfiguration 对象传入 NameResolver 对象。 在触发器或绑定特性构造函数参数中包含占位符,NameResolver 代码将提供用于取代这些占位符的实际值。 占位符的标识方式是以百分号 (%) 将其括住,如以下示例中所示:

public static void WriteLog([QueueTrigger("%logqueue%")] string logMessage)
{
    Console.WriteLine(logMessage);
}

此代码允许在测试环境中使用名为 logqueuetest 的队列,并在生产环境中使用名为 logqueueprod 的队列。 在 appSettings 集合中指定条目名称,而不是硬编码的队列名称。

如果未提供自定义值,则使用默认设置 NameResolver。 默认设置从应用设置或环境变量中获取值。

NameResolver 类从 appSettings 获取队列名称,如以下示例中所示:

public class CustomNameResolver : INameResolver
{
    public string Resolve(string name)
    {
        return ConfigurationManager.AppSettings[name].ToString();
    }
}

NameResolver 类传入 JobHost 对象,如以下示例中所示:

 static void Main(string[] args)
{
    JobHostConfiguration config = new JobHostConfiguration();
    config.NameResolver = new CustomNameResolver();
    JobHost host = new JobHost(config);
    host.RunAndBlock();
}

Azure Functions 实现 INameResolver 以从应用设置中获取值,如以下示例中所示。 直接使用 WebJobs SDK 时,可以编写一个自定义实现,用于从偏好的任何源获取占位符替代值。

在运行时绑定

如果需要在使用 QueueBlobTable 等绑定特性之前在函数中执行某项操作,则可以使用 IBinder 接口。

下面的示例采用一个输入队列消息,并在输出队列中创建具有相同内容的新消息。 输出队列名称由函数正文中的代码设置。

public static void CreateQueueMessage(
    [QueueTrigger("inputqueue")] string queueMessage,
    IBinder binder)
{
    string outputQueueName = "outputqueue" + DateTime.Now.Month.ToString();
    QueueAttribute queueAttribute = new QueueAttribute(outputQueueName);
    CloudQueue outputQueue = binder.Bind<CloudQueue>(queueAttribute);
    outputQueue.AddMessage(new CloudQueueMessage(queueMessage));
}

有关详细信息,请参阅 Azure Functions 文档中的运行时绑定

绑定参考信息

Azure Functions 文档中提供了有关每个绑定类型的参考信息。 每篇绑定参考文章中以存储队列为例介绍了以下信息:

  • - 要安装哪个包才能在 WebJobs SDK 项目中包含绑定支持。
  • 示例 - C# 类库示例适用于 WebJobs SDK;只需忽略 FunctionName 特性。
  • 特性 - 用于绑定类型的特性。
  • 配置 - 特性属性和构造函数参数的解释。
  • 用法 - 可绑定到哪些类型,以及有关绑定工作原理的信息。 例如:轮询算法、有害队列处理。

有关绑定参考文章的列表,请参阅 Azure Functions 触发器和绑定一文中的“支持的绑定”。 在该列表中,HTTP、Webhook 和事件网格绑定仅受 Azure Functions 的支持,而不受 WebJobs SDK 的支持。

Disable 特性

Disable 特性用于控制是否可以触发某个函数。

在以下示例中,如果应用设置 Disable_TestJob 使用值“1”或“True”(区分大小写),则函数不会运行。 在这种情况下,运行时将创建日志消息“函数 'Functions.TestJob' 已禁用”。

[Disable("Disable_TestJob")]
public static void TestJob([QueueTrigger("testqueue2")] string message)
{
    Console.WriteLine("Function with Disable attribute executed!");
}

在 Azure 门户中更改应用设置值时,会导致 WebJob 重启并选取新设置。

可以在参数、方法或类级别声明该特性。 设置名称还可以包含绑定表达式。

Timeout 特性

如果某个函数在指定的时间段内未完成,则 Timeout 特性会导致该函数被取消。 以下示例中的函数将运行一天,而不会超时。 指定超时后,该函数将在 15 秒后被取消。

[Timeout("00:00:15")]
public static async Task TimeoutJob(
    [QueueTrigger("testqueue2")] string message,
    CancellationToken token,
    TextWriter log)
{
    await log.WriteLineAsync("Job starting");
    await Task.Delay(TimeSpan.FromDays(1), token);
    await log.WriteLineAsync("Job completed");
}

可以在类或方法级别应用 Timeout 特征,并可以使用 JobHostConfiguration.FunctionTimeout 指定全局超时。 类或方法级别的超时替代全局超时。

Singleton 特性

使用 Singleton 特性可确保即使有多个主机 Web 应用的实例,也只有一个函数实例运行。 实现分布式锁定即可做到这一点。

在以下示例中,在任意给定时间只会运行 ProcessImage 函数的单个实例:

[Singleton]
public static async Task ProcessImage([BlobTrigger("images")] Stream image)
{
     // Process the image
}

SingletonMode.Listener

某些触发器为并发管理提供内置支持:

  • QueueTrigger - 将 JobHostConfiguration.Queues.BatchSize 设置为 1。
  • ServiceBusTrigger - 将 ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls 设置为 1。
  • FileTrigger - 将 FileProcessor.MaxDegreeOfParallelism 设置为 1。

可以使用这些设置来确保函数在单个实例上作为单一实例运行。 若要确保在 Web 应用横向扩展到多个实例时只运行函数的单个实例,请对该函数应用侦听器级别的单一实例锁 ([Singleton(Mode = SingletonMode.Listener)])。 启动 JobHost 时获取侦听器锁。 如果三个横向扩展的实例全部同时启动,只有其中的一个实例获取该锁,并且只有一个侦听器启动。

范围值

可以在单一实例中指定范围表达式/值,以确保该范围内的所有函数执行都将被串行化。 以这种方式实现更细化的锁定可以为函数提供一定程度的并行度,同时根据你的需求串行化其他调用。 例如,在以下示例中,范围表达式将绑定到传入消息的 Region 值。 如果队列分别在区域“East”、“East”和“West”中包含 3 条消息,则区域为“East”的消息将连续执行,而区域为“West”的消息将与这些消息并行执行。

[Singleton("{Region}")]
public static async Task ProcessWorkItem([QueueTrigger("workitems")] WorkItem workItem)
{
     // Process the work item
}

public class WorkItem
{
     public int ID { get; set; }
     public string Region { get; set; }
     public int Category { get; set; }
     public string Description { get; set; }
}

SingletonScope.Host

锁的默认范围为 SingletonScope.Function,这意味着,锁范围(Blob 租约路径)已绑定到完全限定的函数名称。 若要跨函数锁定,请指定 SingletonScope.Host,并使用在不想要同时运行的所有函数中相同的范围 ID 名称。 在以下示例中,每次只会运行 AddItemRemoveItem 的一个实例:

[Singleton("ItemsLock", SingletonScope.Host)]
public static void AddItem([QueueTrigger("add-item")] string message)
{
     // Perform the add operation
}

[Singleton("ItemsLock", SingletonScope.Host)]
public static void RemoveItem([QueueTrigger("remove-item")] string message)
{
     // Perform the remove operation
}

查看租约 Blob

WebJobs SDK 在幕后使用 Azure Blob 租约来实现分布式锁定。 可以在 AzureWebJobsStorage 存储帐户的 azure-webjobs-host 容器中的路径“locks”下面找到单一实例使用的租约 Blob。 例如,前面演示的第一个 ProcessImage 示例的租约 Blob 路径可能是 locks/061851c758f04938a4426aa9ab3869c0/WebJobs.Functions.ProcessImage。 所有路径包含 JobHost ID,在本例中为 061851c758f04938a4426aa9ab3869c0。

异步函数

有关如何编写异步函数代码的信息,请参阅有关异步函数的 Azure Functions 文档。

取消令牌

有关如何处理取消令牌的信息,请参阅有关取消令牌和正常关闭的 Azure Functions 文档。

多个实例

如果 Web 应用在多个实例上运行,则会有一个连续的 WebJob 在每个实例上运行,并侦听触发器和调用函数。 各种触发器绑定旨在以协作方式有效分担各个实例上的工作,以便横向扩展到多个实例后可以处理更多的负载。

队列和 Blob 触发器自动阻止函数多次处理队列消息或 Blob;函数不需要是幂等的。

计时器触发器会自动确保只会运行计时器的一个实例,因此,在给定的计划时间,不会运行多个函数实例。

如果要确保即使有多个主机 Web 应用的实例,也只有一个函数实例运行,可以使用 Singleton 特性。

筛选器

通过函数筛选器(预览版)可以使用自己的逻辑自定义 WebJobs 执行管道。 筛选器类似于 ASP.NET Core 筛选器。 可将其实现为应用到函数或类的声明性特性。 有关详细信息,请参阅函数筛选器

日志记录和监视

我们建议使用针对 ASP.NET 开发的日志记录框架;入门文章中介绍了其用法。

日志筛选

ILogger 实例创建的每个日志都包含关联的 CategoryLevelLogLevel 是一个枚举,整数代码指示相对重要性:

LogLevel 代码
跟踪 0
调试 1
信息 2
警告 3
错误 4
关键 5
6

可以根据特定的 LogLevel 独立筛选每个类别。 例如,你可能想要查看有关 Blob 触发器处理的所有日志,但对于其他任何操作,只想查看 Error 和更高级别的日志。

为了方便指定筛选规则,WebJobs SDK 提供了 LogCategoryFilter,可将其传入许多现有的日志记录提供程序,包括控制台。

LogCategoryFilter 包含初始值为 InformationDefault 属性,这意味着,将会记录级别为 InformationWarningErrorCritical 的所有消息,但会筛选掉级别为 DebugTrace 的所有消息。

使用 CategoryLevels 属性可以指定特定类别的日志级别,以便能够微调日志记录输出。 如果在 CategoryLevels 字典中未找到任何匹配项,筛选器在决定是否筛选消息时会回退到 Default 值。

以下示例构造的筛选器默认会筛选 Warning 级别的所有日志。 FunctionHost.Results 类别在 Error 级别筛选。 LogCategoryFilter 将当前类别与所有已注册的 CategoryLevels 进行比较,并选择最长匹配项。 这意味着,为 Host.Triggers 注册的 Debug 级别将匹配 Host.Triggers.QueueHost.Triggers.Blob。 这样,便可以控制更广泛的类别,而无需添加每个类别。

var filter = new LogCategoryFilter();
filter.DefaultLevel = LogLevel.Warning;
filter.CategoryLevels[LogCategories.Function] = LogLevel.Error;
filter.CategoryLevels[LogCategories.Results] = LogLevel.Error;
filter.CategoryLevels["Host.Triggers"] = LogLevel.Debug;

config.LoggerFactory = new LoggerFactory()
    .AddApplicationInsights(instrumentationKey, filter.Filter)
    .AddConsole(filter.Filter);

后续步骤

本指南提供的代码片段演示了如何处理 WebJobs SDK 的常见使用方案。 有关完整示例,请参阅 azure-webjobs-sdk-samples