Durable Functions (Azure Functions) 的绑定

Durable Functions 扩展引入了三个触发器绑定,用于控制业务流程协调程序、实体和活动函数的执行。 它还引入了输出绑定,充当 Durable Functions 运行时的客户端。

确保在文章顶部选择 Durable Functions 开发语言。

重要

本文支持用于 Durable Functions 的 Python v1 和 Python v2 编程模型。

Python v2 编程模型

Python v2 编程模型支持 Durable Functions。 要使用 v2 模型,必须安装 Durable Functions SDK,即 PyPI 包 azure-functions-durable、版本 1.2.2 或更高版本。 还必须检查 host.json,以确保应用引用 Extension Bundles 版本 4.x,以便将 v2 模型与 Durable Functions 配合使用。

可以在 Durable Functions SDK for Python 存储库中提供反馈和建议。

业务流程触发器

业务流程触发器可用于创作持久业务流程协调程序函数。 安排新业务流程实例且现有业务流程实例接收事件时,便会执行此触发器。 可触发业务流程协调程序函数的事件示例包括持久计时器到期、活动函数响应,以及由外部客户端引发的事件。

在 .NET 中创作函数时,使用 OrchestrationTriggerAttribute .NET 属性配置业务流程触发器。

对于 Java,@DurableOrchestrationTrigger 注释用于配置业务流程触发器。

编写业务流程协调程序函数时,业务流程触发器由 function.json 文件的 bindings 数组中的以下 JSON 对象定义:

{
    "name": "<Name of input parameter in function signature>",
    "orchestration": "<Optional - name of the orchestration>",
    "type": "orchestrationTrigger",
    "direction": "in"
}
  • orchestration 是客户端想要启动此业务流程协调程序函数的新实例时必须使用的业务流程名称。 此属性是可选的。 如果未指定,则使用该函数的名称。

Azure Functions 支持两种 Python 编程模型。 定义业务流程触发器的方式取决于选择的编程模型。

Python v2 编程模型允许直接在 Python 函数代码中使用 orchestration_trigger 修饰器来定义业务流程触发器。

在 v2 模型中,Durable Functions 触发器和绑定是从 DFApp 的实例访问的,它是 FunctionApp 的子类,还能导出 Durable Functions 特定的修饰器。

在内部,此触发器绑定轮询已配置的持久存储以查找新的业务流程事件,例如业务流程启动事件、持久计时器到期事件、活动函数响应事件,以及其他函数引发的外部事件。

触发器行为

以下是有关业务流程触发器的一些注意事项:

  • 单线程 - 单个调度程序线程用于单个主机实例上的所有业务流程协调程序函数执行 。 为此,必须确保业务流程协调程序函数代码有效,且不执行任何 I/O 操作。 还必须确保此线程不执行任何异步工作,等待特定于 Durable Functions 的任务类型除外。
  • 有害消息处理 - 业务流程触发器中不支持有害消息。
  • 消息可见性 - 业务流程触发器消息会取消排队并在可配置的持续时间内保持可见 。 只要函数应用正常运行,这些消息的可见性就会自动更新。
  • 返回值 - 返回值序列化为 JSON,并持久保存到 Azure 表存储中的业务流程历史记录表 。 业务流程客户端绑定可以查询这些值,后文会对此进行介绍。

警告

业务流程协调程序函数不得使用业务流程触发器绑定之外的任何输入或输出绑定。 这样做有可能导致 Durable Task 扩展出现问题,因为这些绑定可能不遵从单线程处理和 I/O 规则。 若要使用其他绑定,请将它们添加到从业务流程协调程序函数调用的活动函数。 有关业务流程协调程序函数的代码限制的详细信息,请参阅业务流程协调程序函数代码约束文档。

警告

绝不应该声明业务流程协调程序函数 async

触发器使用情况

业务流程触发器绑定同时支持输入和输出。 下面是有关输入和输出处理的一些须知事项:

  • 输入 - 可以使用通过上下文输入对象获取的输入来调用业务流程触发器。 所有输入必须是 JSON 可序列化类型。
  • 输出 - 业务流程触发器支持输出值以及输入 。 函数的返回值用于分配输出值,且必须是 JSON 可序列化的。

触发器示例

以下示例代码演示了最简单的“Hello World”业务流程协调程序函数的形式。 请注意,此示例业务流程协调程序实际上不会计划任何任务。

用于定义触发器的特定属性取决于是在进程内还是在隔离的工作进程中运行 C# 函数。

[FunctionName("HelloWorld")]
public static string Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    string name = context.GetInput<string>();
    return $"Hello {name}!";
}

注意

前面的代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const name = context.df.getInput();
    return `Hello ${name}!`;
});

注意

生成器函数退出时,durable-functions 库负责调用同步 context.done 方法。

import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@myApp.orchestration_trigger(context_name="context")
def my_orchestrator(context):
    result = yield context.call_activity("Hello", "Tokyo")
    return result
param($Context)

$InputData = $Context.Input
$InputData
@FunctionName("HelloWorldOrchestration")
public String helloWorldOrchestration(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    return String.format("Hello %s!", ctx.getInput(String.class));
}

大多数业务流程协调程序函数会调用活动函数,因此下面的“Hello World”示例演示了如何调用活动函数:

[FunctionName("HelloWorld")]
public static async Task<string> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    string name = context.GetInput<string>();
    string result = await context.CallActivityAsync<string>("SayHello", name);
    return result;
}

注意

前面的代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const name = context.df.getInput();
    const result = yield context.df.callActivity("SayHello", name);
    return result;
});
@FunctionName("HelloWorld")
public String helloWorldOrchestration(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    String result = ctx.callActivity("SayHello", input, String.class).await();
    return result;
}

活动触发器

使用活动触发器可以创作业务流程协调程序函数调用的函数(称为活动函数)。

活动触发器使用 ActvityTriggerAttribute .NET 属性进行配置。

使用 @DurableActivityTrigger 注释配置活动触发器。

活动触发器由 function.jsonbindings 数组中的以下 JSON 对象定义:

{
    "name": "<Name of input parameter in function signature>",
    "activity": "<Optional - name of the activity>",
    "type": "activityTrigger",
    "direction": "in"
}
  • activity 是活动的名称。 此值是业务流程协调程序函数用来调用此活动函数的名称。 此属性是可选的。 如果未指定,则使用该函数的名称。

定义活动触发器的方式取决于选择的编程模型。

在 Python 函数代码中直接使用 activity_trigger 修饰器。

在内部,此触发器绑定轮询已配置的持久存储以查找新的活动执行事件。

触发器行为

以下是有关活动触发器的一些注意事项:

  • 线程处理 - 与业务流程触发器不同,活动触发器没有关于线程处理或 I/O 的任何限制 。 可以将它们视为常规功能。
  • 有害消息处理 - 活动触发器不支持有害消息。
  • 消息可见性 - 活动触发器消息会取消排队并在可配置的持续时间内保持可见 。 只要函数应用正常运行,这些消息的可见性就会自动更新。
  • 返回值 - 返回值序列化为 JSON,并持久保存到已配置的持久存储。

触发器使用情况

类似于业务流程触发器,活动触发器绑定也同时支持输入和输出。 下面是有关输入和输出处理的一些须知事项:

  • 输入 - 可以使用来自业务流程协调程序函数的输入调用活动触发器。 所有输入必须是 JSON 可序列化类型。
  • 输出 - 活动函数支持输出值以及输入。 函数的返回值用于分配输出值,且必须是 JSON 可序列化的。
  • 元数据 - .NET 活动函数可以绑定到 string instanceId 参数,以获取调用业务流程的实例 ID。

触发器示例

以下示例代码演示了简单的 SayHello 活动函数可能呈现的形式。

[FunctionName("SayHello")]
public static string SayHello([ActivityTrigger] IDurableActivityContext helloContext)
{
    string name = helloContext.GetInput<string>();
    return $"Hello {name}!";
}

.NET ActivityTriggerAttribute 绑定的默认参数类型是 IDurableActivityContext(对于 Durable Functions v1,则是 DurableActivityContext)。 但是,.NET 活动触发器还支持直接绑定到 JSON 可序列化类型(包括基元类型),因此相同的函数可以简化为如下所示:

[FunctionName("SayHello")]
public static string SayHello([ActivityTrigger] string name)
{
    return $"Hello {name}!";
}
module.exports = async function(context) {
    return `Hello ${context.bindings.name}!`;
};

JavaScript 绑定可以还作为附加参数进行传入,因此,同一函数可以简化如下:

module.exports = async function(context, name) {
    return `Hello ${name}!`;
};
import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@myApp.activity_trigger(input_name="myInput")
def my_activity(myInput: str):
    return "Hello " + myInput
param($name)

"Hello $name!"
@FunctionName("SayHello")
public String sayHello(@DurableActivityTrigger(name = "name") String name) {
    return String.format("Hello %s!", name);
}

使用输入和输出绑定

除了活动触发器绑定以外,还可以使用普通的输入和输出绑定。

例如,可将输入提取到活动绑定,并使用“事件中心”输出绑定向事件中心发送消息:

{
  "bindings": [
    {
      "name": "message",
      "type": "activityTrigger",
      "direction": "in"
    },
    {
      "type": "eventHub",
      "name": "outputEventHubMessage",
      "connection": "EventhubConnectionSetting",
      "eventHubName": "eh_messages",
      "direction": "out"
  }
  ]
}
module.exports = async function (context) {
    context.bindings.outputEventHubMessage = context.bindings.message;
};

业务流程客户端

通过业务流程客户端绑定,可以编写与业务流程协调程序函数进行交互的函数。 这些函数通常称为客户端函数。 例如,可以通过以下方式对业务流程实例进行操作:

  • 启动它们。
  • 查询它们的状态。
  • 终止它们。
  • 当它们正在运行时,向它们发送事件。
  • 清除实例历史记录。

可以使用 DurableClientAttribute 属性(在 Durable Functions v1.x 中,则是 OrchestrationClientAttribute)绑定到业务流程客户端。

可以使用 @DurableClientInput 注释绑定到业务流程客户端。

持久客户端触发器由 function.jsonbindings 数组中的以下 JSON 对象定义:

{
    "name": "<Name of input parameter in function signature>",
    "taskHub": "<Optional - name of the task hub>",
    "connectionName": "<Optional - name of the connection string app setting>",
    "type": "orchestrationClient",
    "direction": "in"
}
  • taskHub - 用于多个函数应用共享同一存储帐户但需要彼此独立的方案。 如果未指定,则使用 host.json 中的默认值。 此值必须与目标业务流程协调程序函数所使用的值匹配。
  • connectionName - 包含存储帐户连接字符串的应用设置的名称。 此连接字符串表示的存储帐户必须与目标业务流程协调程序函数所用的存储帐户相同。 如果未指定,则使用函数应用的默认存储帐户连接字符串。

注意

在大多数情况下,建议忽略这些属性,并依靠默认行为。

定义持久客户端触发器的方式取决于选择的编程模型。

在 Python 函数代码中直接使用 durable_client_input 修饰器。

客户端使用情况

通常会绑定到 IDurableClient(在 Durable Functions v1.x 中,则是 DurableOrchestrationClient),它可提供对 Durable Functions 支持的所有业务流程客户端 API 的完全访问权限。

通常绑定到 DurableClientContext 类。

必须使用特定于语言的 SDK 才能访问客户端对象。

下面的示例是启动“HelloWorld”业务流程的队列触发型函数。

[FunctionName("QueueStart")]
public static Task Run(
    [QueueTrigger("durable-function-trigger")] string input,
    [DurableClient] IDurableOrchestrationClient starter)
{
    // Orchestration input comes from the queue message content.
    return starter.StartNewAsync<string>("HelloWorld", input);
}

注意

前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 属性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。

function.json

{
  "bindings": [
    {
      "name": "input",
      "type": "queueTrigger",
      "queueName": "durable-function-trigger",
      "direction": "in"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

index.js

const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    return instanceId = await client.startNew("HelloWorld", undefined, context.bindings.input);
};

run.ps1

param([string] $input, $TriggerMetadata)

$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -Input $input
import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def durable_trigger(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name)
    response = client.create_check_status_response(req, instance_id)
    return response

function.json

{
  "bindings": [
    {
      "name": "input",
      "type": "queueTrigger",
      "queueName": "durable-function-trigger",
      "direction": "in"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

run.ps1

param([string]$InputData, $TriggerMetadata)

$InstanceId = Start-DurableOrchestration -FunctionName 'HelloWorld' -Input $InputData
@FunctionName("QueueStart")
public void queueStart(
        @QueueTrigger(name = "input", queueName = "durable-function-trigger", connection = "Storage") String input,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
    // Orchestration input comes from the queue message content.
    durableContext.getClient().scheduleNewOrchestrationInstance("HelloWorld", input);
}

有关启动实例的更多详细信息,请参阅实例管理

实体触发器

使用实体触发器可以创作实体函数。 此触发器支持处理特定实体实例的事件。

注意

从 Durable Functions 2.x 开始提供实体触发器。

在内部,此触发器绑定轮询已配置的持久存储以查找需执行的新实体操作。

实体触发器是使用 EntityTriggerAttribute .NET 属性配置的。

实体触发器由 function.jsonbindings 数组中的以下 JSON 对象定义:

{
    "name": "<Name of input parameter in function signature>",
    "entityName": "<Optional - name of the entity>",
    "type": "entityTrigger",
    "direction": "in"
}

默认情况下,实体的名称就是函数的名称。

注意

Java 还不支持实体触发器。

定义实体触发器的方式取决于选择的编程模型。

在 Python 函数代码中直接使用 entity_trigger 修饰器。

触发器行为

以下是有关实体触发器的一些注意事项:

  • 单线程:使用单个调度程序线程来处理特定实体的操作。 如果将多个消息同时发送到单个实体,将会逐个处理操作。
  • 有害消息处理 - 实体触发器中不支持有害消息。
  • 消息可见性 - 实体触发器消息会取消排队并在可配置的持续时间内保持可见。 只要函数应用正常运行,这些消息的可见性就会自动更新。
  • 返回值 - 实体函数不支持返回值。 可以使用特定的 API 来保存状态,或者将值传回到业务流程。

在执行实体期间对实体所做的任何状态更改将在执行完成后自动保留。

有关定义实体触发器并与之交互的更多信息和示例,请参阅持久实体文档。

实体客户端

使用实体客户端绑定可以异步触发实体函数。 这些函数有时称为客户端函数

可以使用 .NET 类库函数中的 DurableClientAttribute .NET 属性绑定到实体客户端。

注意

还可以使用 [DurableClientAttribute] 绑定到业务流程客户端

实体客户端由 function.jsonbindings 数组中的以下 JSON 对象定义:

{
    "name": "<Name of input parameter in function signature>",
    "taskHub": "<Optional - name of the task hub>",
    "connectionName": "<Optional - name of the connection string app setting>",
    "type": "durableClient",
    "direction": "in"
}
  • taskHub - 用于多个函数应用共享同一存储帐户但需要彼此独立的方案。 如果未指定,则使用 host.json 中的默认值。 此值必须与目标实体函数所使用的值匹配。
  • connectionName - 包含存储帐户连接字符串的应用设置的名称。 此连接字符串表示的存储帐户必须与目标实体函数所用的存储帐户相同。 如果未指定,则使用函数应用的默认存储帐户连接字符串。

注意

在大多数情况下,建议忽略可选属性,并依赖默认行为。

定义实体客户端的方式取决于选择的编程模型。

在 Python 函数代码中直接使用 durable_client_input 修饰器。

注意

Java 还不支持实体客户端。

有关以客户端形式与实体交互的更多信息和示例,请参阅持久实体文档。

host.json 设置

Durable Functions 的配置设置。

注意

Azure Functions 运行时的所有版本均支持 Durable Functions 的所有主要版本。 但是,根据 Azure Functions 运行时的版本和使用的 Durable Functions 扩展版本,host json 配置的架构略有不同。 以下示例可与 Azure Functions 2.0 和3.0 一起使用。 在这两个示例中,如果使用 Azure Functions 1.0,则可用设置是相同的,但 host.json 的“durableTask”部分应位于 host. json 配置的根目录中,而不是作为“extension”下的字段。

{
 "extensions": {
  "durableTask": {
    "hubName": "MyTaskHub",
    "storageProvider": {
      "connectionStringName": "AzureWebJobsStorage",
      "controlQueueBatchSize": 32,
      "controlQueueBufferThreshold": 256,
      "controlQueueVisibilityTimeout": "00:05:00",
      "maxQueuePollingInterval": "00:00:30",
      "partitionCount": 4,
      "trackingStoreConnectionStringName": "TrackingStorage",
      "trackingStoreNamePrefix": "DurableTask",
      "useLegacyPartitionManagement": true,
      "workItemQueueVisibilityTimeout": "00:05:00",
    },
    "tracing": {
      "traceInputsAndOutputs": false,
      "traceReplayEvents": false,
    },
    "notifications": {
      "eventGrid": {
        "topicEndpoint": "https://topic_name.chinanorth2-1.eventgrid.chinacloudapi.cn/api/events",
        "keySettingName": "EventGridKey",
        "publishRetryCount": 3,
        "publishRetryInterval": "00:00:30",
        "publishEventTypes": [
          "Started",
          "Pending",
          "Failed",
          "Terminated"
        ]
      }
    },
    "maxConcurrentActivityFunctions": 10,
    "maxConcurrentOrchestratorFunctions": 10,
    "extendedSessionsEnabled": false,
    "extendedSessionIdleTimeoutInSeconds": 30,
    "useAppLease": true,
    "useGracefulShutdown": false,
    "maxEntityOperationBatchSize": 50
  }
 }
}

任务中心名称必须以字母开头且只能包含字母和数字。 如果未指定,则函数应用的默认任务中心名称为 TestHubName。 有关详细信息,请参阅任务中心

属性 默认 说明
hubName TestHubName(如果使用 Durable Functions 1.x,则为 DurableFunctionsHub) 可以使用备用任务中心名称将多个 Durable Functions 应用程序彼此隔离,即使这些应用程序使用同一存储后端。
controlQueueBatchSize 32 要从控制队列中一次性拉取的消息数。
controlQueueBufferThreshold 适用于 Python 的消耗计划:32
适用于 JavaScript 和 C# 的消耗计划:128
专用/高级计划:256
一次可以在内存中缓冲的控制队列消息数,此时调度程序将等待,然后再将任何其他消息出队。
partitionCount 4 控制队列的分区计数。 可以是 1 到 16 之间的正整数。
controlQueueVisibilityTimeout 5 分钟 已取消排队的控制队列消息的可见性超时。
workItemQueueVisibilityTimeout 5 分钟 已取消排队的工作项队列消息的可见性超时。
maxConcurrentActivityFunctions 消耗计划:10
专用/高级计划:当前计算机上的处理器数的 10 倍
可以在单个主机实例上并发处理的活动函数的最大数目。
maxConcurrentOrchestratorFunctions 消耗计划:5
专用/高级计划:当前计算机上的处理器数的 10 倍
可以在单个主机实例上并发处理的业务流程协调程序函数的最大数目。
maxQueuePollingInterval 30 秒 最大的控制和工作项队列轮询时间间隔,采用 hh:mm:ss 格式。 值越高,可能导致的消息处理延迟也越高。 值越低,可能导致的存储成本会越高,因为存储事务数增高。
connectionName(2.7.0 及更高版本)
connectionStringName (2.x)
azureStorageConnectionStringName (1.x)
AzureWebJobsStorage 应用设置或设置集合的名称,用于指定如何连接到基础 Azure 存储资源。 如果提供的是单个应用设置,它应该是 Azure 存储连接字符串。
trackingStoreConnectionName(2.7.0 及更高版本)
trackingStoreConnectionStringName
应用设置或设置集合的名称,用于指定如何连接到“历史记录”表和“实例”表。 如果提供的是单个应用设置,它应该是 Azure 存储连接字符串。 如果未指定,则使用 connectionStringName (Durable 2.x) 或 azureStorageConnectionStringName (Durable 1.x) 连接。
trackingStoreNamePrefix 指定 trackingStoreConnectionStringName 时用于“历史记录”和“实例”表的前缀。 如果未设置,则默认前缀值为 DurableTask。 如果 trackingStoreConnectionStringName 未指定,则“历史记录”和“实例”表会使用 hubName 值作为其前缀,trackingStoreNamePrefix 的任何设置都会被忽略。
traceInputsAndOutputs false 一个指示是否跟踪函数调用的输入和输出的值。 跟踪函数执行事件时的默认行为是在函数调用的序列化输入和输出中包括字节数。 此行为提供的有关输入和输出情况的信息是最少的,不会导致日志膨胀,也不会无意中将敏感信息公开。 将此属性设置为 true 会导致默认函数日志记录将函数输入和输出的整个内容都记录下来。
traceReplayEvents false 一个值,该值指示是否将业务流程重播事件写入到 Application Insights。
eventGridTopicEndpoint Azure 事件网格自定义主题终结点的 URL。 设置此属性后,业务流程生命周期通知事件就会发布到此终结点。 此属性支持应用设置解析。
eventGridKeySettingName 应用设置的名称,该设置包含的密钥用于在 EventGridTopicEndpoint 上通过 Azure 事件网格自定义主题进行身份验证。
eventGridPublishRetryCount 0 发布到事件网格主题失败时要重试的次数。
eventGridPublishRetryInterval 5 分钟 事件网格发布重试间隔(采用 hh:mm:ss 格式)。
eventGridPublishEventTypes 要发布到事件网格的事件类型列表。 如果未指定,则将发布所有事件类型。 允许的值包括 StartedCompletedFailedTerminated
useAppLease 如果设置为 true,应用将要求在处理任务中心消息之前获取应用级别 blob 租约。 有关详细信息,请参阅灾难恢复和地理分布文档。 从 v2.3.0 开始可用。
useLegacyPartitionManagement false 设置为 false 时,使用分区管理算法来减少横向扩展时重复执行函数的可能性。从 v2.3.0 开始可用。
useGracefulShutdown false (预览)启用正常关闭以减少主机关闭导致进程内函数执行失败的机会。
maxEntityOperationBatchSize(2.6.1) 消耗计划:50
专用/高级计划:5000
批处理形式处理的最大实体操作数。 如果设置为 1,则禁用批处理,并且每个操作消息由单独的函数调用处理。

许多此类设置用于优化性能。 有关详细信息,请参阅性能和规模

后续步骤