Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Durable Functions 扩展引入了三个触发器绑定,用于控制协调器函数、实体函数和活动函数的执行。 它还引入了一种输出绑定,该绑定充当 “Durable Functions” 运行时的客户端。
本文讨论这四个绑定的使用并提供代码示例。
请确保在文章顶部选择 Durable Functions 开发语言。
Durable Functions 支持 Azure Functions 的两个版本的 Python 编程模型。 由于 Python v2 是推荐的版本,因此本文中的示例仅支持此版本。
先决条件
- Durable Functions SDK,即 Python 包索引 (PyPI) 中的包
azure-functions-durable,版本1.2.2或更高版本 - 扩展包 版本 4.x(或更高版本),已在 host.json 项目文件中设置
可以在适用于 Python 存储库的 Durable Functions SDK 中提供反馈和建议。
业务流程触发器
可以使用协调触发器开发 持久化协调器函数。 此触发器将在新的业务流程实例被调度时,以及现有的业务流程实例接收到事件时执行。 可以触发业务流程协调程序函数的事件示例包括持久计时器过期、活动函数响应和外部客户端引发的事件。
在 .NET 中开发函数时,可以使用 OrchestrationTriggerAttribute .NET 属性配置业务流程触发器。
对于Java,可以使用 @DurableOrchestrationTrigger 注释来配置业务流程触发器。
使用 Node.js 编程模型版本 4 开发函数时,请从app模块导入@azure/functions npm对象。 然后,直接在函数代码中调用 Durable Functions API 的 app.orchestration 方法。 此方法将您的协调器函数注册到 Durable Functions 框架中。
编写业务流程协调程序函数时,可以在 bindings 文件的数组中使用以下 JSON 对象定义业务流程触发器:
{
"name": "<name-of-input-parameter-in-function-signature>",
"orchestration": "<optional-name-of-orchestration>",
"type": "orchestrationTrigger",
"direction": "in"
}
该值 orchestration 是客户端想要启动协调函数新实例时必须使用的协调程序名称。 此属性是可选的。 如果未指定,则使用函数的名称。
使用 Python v2 编程模型时,可以直接在 Python 函数代码中使用 orchestration_trigger 修饰器来定义业务流程触发器。
在 v2 模型中,可以从 DFApp 实例访问 Durable Functions 触发器和绑定。 可以使用此子类 FunctionApp 导出特定于 Durable Functions 的修饰器。
在内部,此触发器绑定轮询配置的持久存储,以获取新的编排事件。 事件示例包括业务流程启动事件、持久计时器过期事件、活动函数响应事件和其他函数引发的外部事件。
触发器行为
以下是有关业务流程触发器的一些注意事项:
- 单线程处理:单个调度程序线程用于单个主机实例上的所有业务流程协调程序函数执行。 因此,请务必确保编排器函数代码高效且不执行任何 I/O 操作。 此外,请确保此线程不会执行任何异步工作,除非它在等待特定于 Durable Functions 的任务类型。
- 毒性消息处理:编排触发器中不支持毒性消息。
- 消息可见性:业务流程触发器消息已取消排队,并在可配置的持续时间内保持不可见。 只要函数应用正常运行,这些消息的可见性就会自动更新。
- 返回值:返回值序列化为 JSON,并保存到 Azure 表存储中的业务流程历史记录表。 业务流程客户端绑定可以查询这些值,后文会对此进行介绍。
Warning
业务流程协调程序函数不应使用业务流程触发器绑定以外的任何输入或输出绑定。 使用其他绑定可能会导致 Durable Task 扩展出现问题,因为这些绑定可能不符合单线程和 I/O 规则。 如果要使用其他绑定,请将它们添加到从协调器函数调用的活动函数。 有关业务流程协调程序函数的编码约束的详细信息,请参阅 Orchestrator 函数代码约束。
Warning
绝不应该声明业务流程协调程序函数 async。
触发器使用情况
业务流程触发器绑定同时支持输入和输出。 下面是有关输入和输出处理的一些说明:
- 输入:可以调用具有输入的编排触发器。 输入是通过上下文输入对象访问的。 所有输入都必须是 JSON 可序列化的。
- 输出:业务流程触发器支持输出和输入值。 函数的返回值用于分配输出值。 返回值必须是 JSON 可序列化的。
触发器示例
以下代码提供了基本 Hello World 业务流程协调程序函数的示例。 此示例业务流程协调程序不计划任何任务。
用于定义触发器的属性取决于是在 与 Functions 主机进程相同的进程中 运行 C# 函数,还是在 独立工作进程中运行 C# 函数。
[FunctionName("HelloWorld")]
public static string RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
string name = context.GetInput<string>();
return $"Hello {name}!";
}
注意
前面的代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 有关版本差异的详细信息,请参阅 Durable Functions 版本概述。
const { app } = require('@azure/functions');
const df = require('durable-functions');
df.app.orchestration('helloOrchestrator', function* (context) {
const name = context.df.getInput();
return `Hello ${name}`;
});
注意
当生成器函数退出时,库 durable-functions 将调用同步 context.done 方法。
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def my_orchestrator(context):
result = yield context.call_activity("Hello", "Tokyo")
return result
param($Context)
$InputData = $Context.Input
$InputData
@FunctionName("HelloWorldOrchestration")
public String helloWorldOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
return String.format("Hello %s!", ctx.getInput(String.class));
}
大多数编排器函数调用活动函数。 以下代码提供了一个 Hello World 示例,演示如何调用活动函数:
[FunctionName("HelloWorld")]
public static async Task<string> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string name = context.GetInput<string>();
string result = await context.CallActivityAsync<string>("SayHello", name);
return result;
}
注意
前面的代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 有关版本差异的详细信息,请参阅 Durable Functions 版本概述。
const { app } = require('@azure/functions');
const df = require('durable-functions');
const activityName = 'hello';
df.app.orchestration('helloOrchestrator', function* (context) {
const name = context.df.getInput();
const result = yield context.df.callActivity(activityName, name);
return result;
});
@FunctionName("HelloWorld")
public String helloWorldOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
String result = ctx.callActivity("SayHello", input, String.class).await();
return result;
}
活动触发器
可以使用活动触发器来开发被业务流程协调程序函数调用的活动函数。
使用 ActivityTriggerAttribute .NET 属性配置活动触发器。
使用 @DurableActivityTrigger 批注配置活动触发器。
若要注册活动函数,请从app模块导入@azure/functions npm对象。 然后,直接在函数代码中调用 Durable Functions API 的 app.activity 方法。
若要定义活动触发器,请在 bindings数组中使用以下 JSON 对象:
{
"name": "<name-of-input-parameter-in-function-signature>",
"activity": "<optional-name-of-activity>",
"type": "activityTrigger",
"direction": "in"
}
该值 activity 是活动的名称。 此值是协调器函数用于调用此活动函数的名称。 此属性是可选的。 如果未指定,则使用函数的名称。
可以直接在 Python 函数代码中使用 activity_trigger 修饰器来定义活动触发器。
在内部,此触发器绑定轮询已配置的持久存储以查找新的活动执行事件。
触发器行为
下面是有关活动触发器的一些说明:
- 线程:与编排触发器不同,活动触发器对线程或 I/O 操作没有任何限制。 可以像常规函数一样对待它们。
- 病毒消息处理:活动触发器中不支持有害消息。
- 消息可见性:活动触发器消息从队列中取出,并在可配置的持续时间内保持不可见。 只要函数应用正常运行,这些消息的可见性就会自动更新。
- 返回值:返回值序列化为 JSON,并保存到配置的持久存储。
触发器使用情况
类似于业务流程触发器,活动触发器绑定也同时支持输入和输出。 下面是有关输入和输出处理的一些说明:
- 输入:可以通过协调器函数的输入来调用活动触发器。 所有输入都必须是 JSON 可序列化的。
- 输出:活动函数支持输出值和输入值。 函数的返回值用于分配输出值,并且必须是 JSON 可序列化的。
-
Metadata:.NET 活动功能可以绑定到
string instanceId参数,以获取所调用编排的实例 ID。
触发器示例
以下代码提供了基本 Hello World 活动函数的示例。
[FunctionName("SayHello")]
public static string SayHello([ActivityTrigger] IDurableActivityContext helloContext)
{
string name = helloContext.GetInput<string>();
return $"Hello {name}!";
}
.NET ActivityTriggerAttribute 绑定的默认参数类型为 IDurableActivityContext (或 Durable Functions 1.x 的 DurableActivityContext )。 但是,.NET 活动触发器还支持直接绑定到 JSON 可序列化的类型(包括基元类型),因此还可以使用以下简化版本的函数:
[FunctionName("SayHello")]
public static string SayHello([ActivityTrigger] string name)
{
return $"Hello {name}!";
}
const { app } = require('@azure/functions');
const df = require('durable-functions');
const activityName = 'hello';
df.app.activity(activityName, {
handler: (input) => {
return `Hello, ${input}`;
},
});
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.activity_trigger(input_name="myInput")
def my_activity(myInput: str):
return "Hello " + myInput
param($name)
"Hello $name!"
@FunctionName("SayHello")
public String sayHello(@DurableActivityTrigger(name = "name") String name) {
return String.format("Hello %s!", name);
}
使用输入和输出绑定
除了活动触发器绑定,还可以使用常规输入和输出绑定。
例如,活动函数可以从协调器函数接收输入。 然后,活动函数可以将该输入作为消息发送到 Azure 事件中心。
const { app } = require('@azure/functions');
const df = require('durable-functions');
df.app.orchestration('helloOrchestrator', function* (context) {
const input = context.df.getInput();
yield context.df.callActivity('sendToEventHub', input);
return `Message sent: ${input}`;
});
const { EventHubProducerClient } = require("@azure/event-hubs");
const connectionString = process.env.EVENT_HUB_CONNECTION_STRING;
const eventHubName = process.env.EVENT_HUB_NAME;
df.app.activity("sendToEventHub", {
handler: async (message, context) => {
const producer = new EventHubProducerClient(connectionString, eventHubName);
try {
const batch = await producer.createBatch();
batch.tryAdd({ body: message });
await producer.sendBatch(batch);
context.log(`Message sent to Event Hubs: ${message}`);
} catch (err) {
context.log.error("Failed to send message to Event Hubs:", err);
throw err;
} finally {
await producer.close();
}
},
});
app.storageQueue('helloQueueStart', {
queueName: 'start-orchestration',
extraInputs: [df.input.durableClient()],
handler: async (message, context) => {
const client = df.getClient(context);
const orchestratorName = message.orchestratorName || 'helloOrchestrator';
const input = message.input || null;
const instanceId = await client.startNew(orchestratorName, { input });
context.log(`Started orchestration with ID = '${instanceId}'`);
},
});
业务流程客户端
可以使用业务流程客户端绑定编写与业务流程协调程序函数交互的函数。 这些函数通常称为 客户端函数。 例如,可以通过以下方式对业务流程实例进行操作:
- 启动它们。
- 查询其状态。
- 终止它们。
- 当它们正在运行时,向它们发送事件。
- 清除实例历史记录。
可以使用 DurableClientAttribute 属性(在 Durable Functions 1.x 中为 OrchestrationClientAttribute)绑定到编排客户端。
可以使用@DurableClientInput注解绑定业务流程客户端。
若要注册客户端函数,请从app模块导入@azure/functions npm对象。 然后,调用特定于触发器类型的 Durable Functions API 方法。 例如,对于 HTTP 触发器,调用 app.http 该方法。 对于队列触发器,调用该方法app.storageQueue。
若要定义持久客户端触发器,请在 bindings数组中使用以下 JSON 对象:
{
"name": "<name-of-input-parameter-in-function-signature>",
"taskHub": "<optional-name-of-task-hub>",
"connectionName": "<optional-name-of-connection-string-app-setting>",
"type": "orchestrationClient",
"direction": "in"
}
- 当多个函数应用共享同一存储帐户,但需要彼此隔离时,将使用
taskHub此属性。 如果未指定此属性,则使用 host.json 中的默认值。 此值必须与目标业务流程协调程序函数使用的值匹配。 -
connectionName值是包含存储帐户连接字符串的应用设置的名称。 此连接字符串所表示的存储帐户必须与目标业务流程协调程序函数使用的存储帐户相同。 如果未指定此属性,将使用函数应用的默认存储帐户连接字符串。
注意
在大多数情况下,建议省略这些属性,并依赖于默认行为。
可以直接在 Python 函数代码中使用 durable_client_input 修饰器来定义持久客户端触发器。
客户端使用情况
通常,您会绑定到 IDurableClient 的实现(在 Durable Functions 1.x 中为 DurableOrchestrationClient),这使您可以完全访问 Durable Functions 支持的所有编排客户端 API。
通常绑定到 DurableClientContext 类。
必须使用特定于语言的 SDK 来访问客户端对象。
以下代码提供了一个队列触发的函数示例,该示例启动 Hello World 编排。
[FunctionName("QueueStart")]
public static Task Run(
[QueueTrigger("durable-function-trigger")] string input,
[DurableClient] IDurableOrchestrationClient starter)
{
// Orchestration input comes from the queue message content.
return starter.StartNewAsync<string>("HelloWorld", input);
}
注意
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 属性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient。 有关版本差异的详细信息,请参阅 Durable Functions 版本概述。
const { app } = require('@azure/functions');
const df = require('durable-functions');
app.storageQueue('helloQueueStart', {
queueName: 'start-orchestration',
extraInputs: [df.input.durableClient()],
handler: async (message, context) => {
const client = df.getClient(context);
const orchestratorName = message.orchestratorName || 'helloOrchestrator';
const input = message.input || null;
const instanceId = await client.startNew(orchestratorName, { input });
context.log(`Started orchestration with ID = '${instanceId}' from queue message.`);
},
});
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.queue_trigger(
arg_name="msg",
queue_name="start-orchestration",
connection="AzureWebJobsStorage"
)
@myApp.durable_client_input(client_name="client")
async def client_function(msg: func.QueueMessage, client: df.DurableOrchestrationClient):
input_data = msg.get_body().decode("utf-8")
await client.start_new("my_orchestrator", None, input_data)
return None
function.json
{
"bindings": [
{
"name": "InputData",
"type": "queueTrigger",
"queueName": "durable-function-trigger",
"direction": "in"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
run.ps1
param([string]$InputData, $TriggerMetadata)
$InstanceId = Start-DurableOrchestration -FunctionName 'HelloWorld' -Input $InputData
@FunctionName("QueueStart")
public void queueStart(
@QueueTrigger(name = "input", queueName = "durable-function-trigger", connection = "Storage") String input,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
// Orchestration input comes from the queue message content.
durableContext.getClient().scheduleNewOrchestrationInstance("HelloWorld", input);
}
有关启动实例的详细信息,请参阅 在 Azure 中的 Durable Functions 中管理实例。
实体触发器
可以使用实体触发器来开发 实体函数。 此触发器支持处理特定实体实例的事件。
注意
实体触发器从 Durable Functions 2.x 开始可用。
在内部,此触发器绑定轮询已配置的持久存储以查找需执行的新实体操作。
使用 EntityTriggerAttribute .NET 属性配置实体触发器。
若要注册实体触发器,请从app模块导入@azure/functions npm对象。 然后,直接在函数代码中调用 Durable Functions API 的 app.entity 方法。
const df = require('durable-functions');
df.app.entity('counter', (context) => {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case 'add':
context.df.setState(currentValue + context.df.getInput());
break;
case 'reset':
context.df.setState(0);
break;
case 'get':
context.df.return(currentValue);
break;
}
});
注意
Java 尚不支持实体触发器。
注意
PowerShell 尚不支持实体触发器。
可以直接在 Python 函数代码中使用 entity_trigger 修饰器来定义实体触发器。
触发器行为
以下是有关实体触发器的一些注意事项:
- 单线程处理:单个调度程序线程用于处理特定实体的工作。 如果多个消息同时发送到单个实体,则一次处理一个操作。
- 有害消息处理:实体触发器中不支持有害消息。
- 消息可见性:实体触发器消息被移出队列,并在一个可配置的时长内保持不可见。 只要函数应用正常运行,这些消息的可见性就会自动更新。
- 返回值:实体函数不支持返回值。 可以使用特定的 API 来保存状态或将值传递回编排。
执行过程中对实体所做的任何状态更改都会在执行完成后自动保留。
有关定义和与实体触发器交互的详细信息和示例,请参阅 Entity 函数。
实体客户端
可以使用实体客户端绑定来异步触发 实体函数。 这些函数有时称为 客户端函数。
可以在 .NET 类库函数中使用 DurableClientAttribute .NET 属性绑定到实体客户端。
注意
还可以使用 [DurableClientAttribute] 绑定到 编排客户端。
与其注册实体客户端,不如使用 signalEntity 或 callEntity 从任何已注册的函数调用实体触发器方法。
在队列触发的函数中,可以使用
client.signalEntity。const { app } = require('@azure/functions'); const df = require('durable-functions'); app.storageQueue('helloQueueStart', { queueName: 'start-orchestration', extraInputs: [df.input.durableClient()], handler: async (message, context) => { const client = df.getClient(context); const entityId = new df.EntityId('counter', 'myCounter'); await client.signalEntity(entityId, 'add', 5); }, });在编排程序函数中,你可以使用
context.df.callEntity:const { app } = require('@azure/functions'); const df = require('durable-functions'); df.app.orchestration('entityCaller', function* (context) { const entityId = new df.EntityId('counter', 'myCounter'); yield context.df.callEntity(entityId, 'add', 5); yield context.df.callEntity(entityId, 'add', 5); const result = yield context.df.callEntity(entityId, 'get'); return result; });
可以直接在 Python 函数代码中使用 durable_client_input 修饰器来定义实体客户端。
注意
Java 尚不支持实体客户端。
注意
PowerShell 尚不支持实体客户端。
有关作为客户端与实体交互的详细信息和示例,请参阅 Access 实体。