在 Durable Functions 中处理外部事件 (Azure Functions)
本文内容
业务流程协调程序函数能够等待和侦听外部事件。 Durable Functions 的此功能对于处理人机交互或其他外部触发器通常比较有用。
注意
外部事件是单向的异步操作。 它们不适用于发送事件的客户端需要来自业务流程协调程序函数的同步响应的情况。
等待事件
业务流程触发器绑定 的“wait-for-external-event”API 使业务流程协调程序函数可异步等待和侦听外部客户端传递的事件。 侦听业务流程协调程序函数声明了事件的“名称”和它期望收到的“数据形态”。
[FunctionName("BudgetApproval")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
bool approved = await context.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}
注意
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const approved = yield context.df.waitForExternalEvent("Approval");
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
approved = context.wait_for_external_event('Approval')
if approved:
# approval granted - do the approved action
else:
# approval denied - send a notification
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$approved = Start-DurableExternalEventListener -EventName "Approval"
if ($approved) {
# approval granted - do the approved action
} else {
# approval denied - send a notification
}
@FunctionName("WaitForExternalEvent")
public void waitForExternalEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
}
前面的示例侦听特定单个事件,并在收到该事件时执行操作。
可以同时侦听多个事件,像以下示例中一样,以下示例等待三个可能的事件通知之一。
[FunctionName("Select")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var event1 = context.WaitForExternalEvent<float>("Event1");
var event2 = context.WaitForExternalEvent<bool>("Event2");
var event3 = context.WaitForExternalEvent<int>("Event3");
var winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
}
注意
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const event1 = context.df.waitForExternalEvent("Event1");
const event2 = context.df.waitForExternalEvent("Event2");
const event3 = context.df.waitForExternalEvent("Event3");
const winner = yield context.df.Task.any([event1, event2, event3]);
if (winner === event1) {
// ...
} else if (winner === event2) {
// ...
} else if (winner === event3) {
// ...
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
event1 = context.wait_for_external_event('Event1')
event2 = context.wait_for_external_event('Event2')
event3 = context.wait_for_external_event('Event3')
winner = context.task_any([event1, event2, event3])
if winner == event1:
# ...
elif winner == event2:
# ...
elif winner == event3:
# ...
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$event1 = Start-DurableExternalEventListener -EventName "Event1" -NoWait
$event2 = Start-DurableExternalEventListener -EventName "Event2" -NoWait
$event3 = Start-DurableExternalEventListener -EventName "Event3" -NoWait
$winner = Wait-DurableTask -Task @($event1, $event2, $event3) -Any
if ($winner -eq $event1) {
# ...
} else if ($winner -eq $event2) {
# ...
} else if ($winner -eq $event3) {
# ...
}
@FunctionName("Select")
public void selectOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
Task<Void> event1 = ctx.waitForExternalEvent("Event1");
Task<Void> event2 = ctx.waitForExternalEvent("Event2");
Task<Void> event3 = ctx.waitForExternalEvent("Event3");
Task<?> winner = ctx.anyOf(event1, event2, event3).await();
if (winner == event1) {
// ...
} else if (winner == event2) {
// ...
} else if (winner == event3) {
// ...
}
}
前面的示例侦听多个事件中的“任何一个”。 还可以等待“所有”事件。
[FunctionName("NewBuildingPermit")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string applicationId = context.GetInput<string>();
var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
var gate2 = context.WaitForExternalEvent("FireDeptApproval");
var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}
注意
前面的代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 DurableOrchestrationContext
而不是 IDurableOrchestrationContext
。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本 一文。
在 .NET 中,如果事件负载无法转换为预期类型 T
,将会引发异常。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const applicationId = context.df.getInput();
const gate1 = context.df.waitForExternalEvent("CityPlanningApproval");
const gate2 = context.df.waitForExternalEvent("FireDeptApproval");
const gate3 = context.df.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
yield context.df.Task.all([gate1, gate2, gate3]);
yield context.df.callActivity("IssueBuildingPermit", applicationId);
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
application_id = context.get_input()
gate1 = context.wait_for_external_event('CityPlanningApproval')
gate2 = context.wait_for_external_event('FireDeptApproval')
gate3 = context.wait_for_external_event('BuildingDeptApproval')
yield context.task_all([gate1, gate2, gate3])
yield context.call_activity('IssueBuildingPermit', application_id)
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$applicationId = $Context.Input
$gate1 = Start-DurableExternalEventListener -EventName "CityPlanningApproval" -NoWait
$gate2 = Start-DurableExternalEventListener -EventName "FireDeptApproval" -NoWait
$gate3 = Start-DurableExternalEventListener -EventName "BuildingDeptApproval" -NoWait
Wait-DurableTask -Task @($gate1, $gate2, $gate3)
Invoke-ActivityFunction -FunctionName 'IssueBuildingPermit' -Input $applicationId
@FunctionName("NewBuildingPermit")
public void newBuildingPermit(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String applicationId = ctx.getInput(String.class);
Task<Void> gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
Task<Void> gate2 = ctx.waitForExternalEvent("FireDeptApproval");
Task<Void> gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
ctx.allOf(List.of(gate1, gate2, gate3)).await();
ctx.callActivity("IssueBuildingPermit", applicationId).await();
}
“wait-for-external-event”API 无限等待某些输入。 在等待时,可以安全地卸载函数应用。 对于此业务流程实例,如果某个事件到达,则会自动唤醒并立即处理该事件。
注意
如果函数应用使用消耗计划,当业务流程协调程序函数等待外部事件任务时,无论它等待多久,都不会产生账单费用。
与活动函数一样,外部事件具有至少一次传送保证 。 这意味着,在某些条件下(例如重启、缩放、崩溃等),应用程序可能会收到重复的同一外部事件。 因此,我们建议外部事件包含某种 ID,以便在业务流程协调程序中手动取消其重复。
发送事件
可使用由业务流程客户端 绑定定义的“raise-event”API 将外部事件发送到业务流程。 还可以使用内置引发事件 HTTP API 将外部事件发送到业务流程。
引发的事件包括实例 ID、eventName 和 eventData 等参数。 业务流程协调程序函数使用“wait-for-external-event” API 处理这些事件。 在发送端和接收端,eventName 必须匹配才能处理事件。 事件数据还必须是 JSON 可序列化的。
在内部,“raise-event”机制将正在等待的业务流程协调程序函数选取的消息排入队列。 如果实例没有在等待指定的事件名,则将事件消息添加到内存中队列 。 如果业务流程实例稍后开始侦听该事件名称, 它将检查队列中的事件消息。
注意
如果没有具有指定实例 ID 的业务流程实例,则丢弃事件消息。
下面是一个示例队列触发的函数,它将“Approval”事件发送到一个业务流程协调程序函数实例。 业务流程实例 ID 来自队列消息的正文。
[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
await client.RaiseEventAsync(instanceId, "Approval", true);
}
注意
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient
属性而不是 DurableClient
属性,并且必须使用 DurableOrchestrationClient
参数类型而不是 IDurableOrchestrationClient
。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本 一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
await client.raiseEvent(instanceId, "Approval", true);
};
import azure.functions as func
import azure.durable_functions as df
async def main(instance_id:str, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
await client.raise_event(instance_id, 'Approval', True)
param($instanceId)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "Approval"
@FunctionName("ApprovalQueueProcessor")
public void approvalQueueProcessor(
@QueueTrigger(name = "instanceID", queueName = "approval-queue") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
durableContext.getClient().raiseEvent(instanceID, "Approval", true);
}
在内部,“raise-event”API 将正在等待的业务流程协调程序函数选取的消息排入队列。 如果实例没有在等待指定的事件名,则将事件消息添加到内存缓冲区。 如果业务流程实例稍后开始侦听该事件名称,它将检查事件消息的缓冲区,并触发正在等待它的任务。
注意
如果没有具有指定实例 ID 的业务流程实例,则丢弃事件消息。
HTTP
以下是向业务流程实例引发“批准”事件的 HTTP 请求的示例。
POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json
"true"
在本例中,实例 ID 硬编码为 MyInstanceId。
后续步骤