业务流程是长时间运行的有状态工作流,可以使用内置管理 API 启动、查询、暂停、恢复和结束。 在 Durable Functions 中,orchestration 客户端绑定公开这些 API。 在Durable Task SDK 中,可以通过DurableTaskClient类来实现这些操作。 本文涵盖了这两个平台支持的所有实例管理操作。
启动实例
业务流程客户端上的 start-new (或 schedule-new)方法启动新的业务流程实例。 在内部,此方法将消息写入已配置的后端(例如 Durable Task Scheduler 或 Azure Storage),然后返回。 此消息以异步方式触发具有指定名称的编排的启动。
下面是用于启动新编排实例的参数:
-
名称:要调度的协调器函数的名称。
-
输入:应作为输入传递给业务流程协调程序函数的任何 JSON 可序列化数据。
-
InstanceId:(可选)实例的唯一 ID。 如果未指定此参数,该方法将使用随机 ID。
小窍门
尽可能为实例 ID 使用随机标识符。 当跨多个 VM 缩放业务流程协调程序函数时,随机实例 ID 有助于确保负载分布相等。 当 ID 来自外部源或在实现单例业务流程协调程序模式时,适合使用非随机实例 ID。
-
Name:要计划的业务流程名称。
-
输入:应作为输入传递给业务流程的任何 JSON 可序列化数据。
-
InstanceId:(可选)实例的唯一 ID。 如果未指定此参数,该方法将使用随机 ID。
小窍门
尽可能为实例 ID 使用随机标识符。 当跨多个 VM 缩放业务流程时,随机实例 ID 有助于确保负载分布相等。 当 ID 来自外部源或在实现单例业务流程协调程序模式时,适合使用非随机实例 ID。
以下示例函数启动新的编排实例:
[FunctionName("HelloWorldQueueTrigger")]
public static async Task Run(
[QueueTrigger("start-queue")] string input,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
string instanceId = await starter.StartNewAsync("HelloWorld", input);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,请使用 OrchestrationClient 属性而不是 DurableClient 属性,并使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient。 有关版本差异的详细信息,请参阅 Durable Functions 版本。
除非另有指定,否则本页上的示例将 HTTP 触发器与以下 function.json一起使用。
function.json
{
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注释
此示例适用于 Durable Functions 2.x 版本。 在版本 1.x 中,使用 orchestrationClient 而不是 durableClient.
index.js
const df = require("durable-functions");
module.exports = async function(context, input) {
const client = df.getClient(context);
const instanceId = await client.startNew("HelloWorld", undefined, input);
context.log(`Started orchestration with ID = ${instanceId}.`);
};
除非另有指定,否则本页上的示例将 HTTP 触发器与以下 function.json一起使用。
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注释
此示例适用于 Durable Functions 2.x 版本。 在版本 1.x 中,使用 orchestrationClient 而不是 durableClient.
__init__.py
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new('HelloWorld', None, None)
logging.log(f"Started orchestration with ID = ${instance_id}.")
除非另有指定,否则本页上的示例将 HTTP 触发器与以下 function.json一起使用。
function.json
{
"bindings": [
{
"name": "Request",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "Response",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注释
此示例适用于 Durable Functions 2.x 版本。 在版本 1.x 中,使用 orchestrationClient 而不是 durableClient.
run.ps1
param($Request, $TriggerMetadata)
$InstanceId = Start-DurableOrchestration -FunctionName 'HelloWorld'
Write-Host "Started orchestration with ID = '$InstanceId'"
@FunctionName("HelloWorldQueueTrigger")
public void helloWorldQueueTrigger(
@QueueTrigger(name = "input", queueName = "start-queue", connection = "Storage") String input,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceID = client.scheduleNewOrchestrationInstance("HelloWorld");
context.getLogger().info("Scheduled orchestration with ID = " + instanceID);
}
若要等待业务流程协调程序在从函数返回之前启动,请使用 waitForInstanceStart() 该方法。
// wait up to 30 seconds for the scheduled orchestration to enter the "Running" state
client.waitForInstanceStart(instanceID, Duration.ofSeconds(30));
重要
目前,PowerShell Durable Task SDK 不可用。
以下代码演示如何使用 Durable Task SDK 启动新的业务流程实例:
using Microsoft.DurableTask.Client;
// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("HelloWorld", input);
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = await client.WaitForInstanceStartAsync(instanceId, timeout: TimeSpan.FromSeconds(30));
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(hello_world, input=input_data)
print(f"Started orchestration with ID = '{instance_id}'.")
# Optionally, wait for the orchestration to start
state = client.wait_for_orchestration_start(instance_id, timeout=30)
import com.microsoft.durabletask.DurableTaskClient;
// Schedule a new orchestration instance
String instanceId = client.scheduleNewOrchestrationInstance("HelloWorld", input);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = client.waitForInstanceStart(instanceId, Duration.ofSeconds(30), false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Schedule a new orchestration instance
const instanceId = await client.scheduleNewOrchestration("HelloWorld", input);
console.log(`Started orchestration with ID = '${instanceId}'.`);
// Optionally, wait for the orchestration to start
const state = await client.waitForOrchestrationStart(instanceId, false, 30);
查询实例
启动新的业务流程实例后,你最可能需要查询其运行时状态,以了解它们是正在运行、完成还是失败。
业务流程客户端上的 get-status 方法返回业务流程实例的状态。
它将 instanceId(必需)、showHistory(可选)、showHistoryOutput(可选)和 showInput(可选)作为参数。
-
showHistory:如果设置为 true,则响应包含执行历史记录。
-
showHistoryOutput:如果设置为 true,则执行历史记录包含活动输出。
-
showInput:如果设置为 false,则响应不包含函数的输入。 默认值是 true。
该方法返回具有以下属性的对象:
-
名称:调度器函数的名称。
-
InstanceId:业务流程的实例 ID(应与
instanceId 输入相同)。
-
CreatedTime:业务流程协调程序函数开始运行的时间。
-
LastUpdatedTime:业务流程上次执行检查点的时间。
-
输入:函数的输入作为 JSON 值。 如果
showInput 为 false,则不填充此字段。
-
CustomStatus:JSON 格式的自定义业务流程状态。
-
输出:函数的输出作为 JSON 值(如果函数完成)。 如果业务流程协调程序函数失败,则此属性包括失败详细信息。 如果业务流程协调程序函数已挂起或终止,则此属性包括挂起或终止的原因(如果有)。
-
RuntimeStatus:以下值之一:
-
待处理:实例已被调度,但尚未开始运行。
-
正在运行:实例正在运行。
-
已完成:实例正常完成。
-
ContinuedAsNew:实例使用新的历史记录重新启动自身。 此状态是暂时性状态。
-
失败:实例失败并出现错误。
-
已终止:实例突然停止。
-
已挂起:该实例已暂停,可以在以后恢复。
-
历史记录:业务流程的执行历史记录。 仅当
showHistory 设置为 true 时,此字段才会被填充。
-
showHistory:如果设置为 true,则响应包含执行历史记录。
-
showHistoryOutput:如果设置为 true,则执行历史记录包含活动输出。
- :如果设置为 ,则响应不包含编排的输入。 默认值是
true。
该方法返回具有以下属性的对象:
-
名称:编排的名称。
-
InstanceId:业务流程的实例 ID(应与
instanceId 输入相同)。
-
CreatedTime:业务流程开始运行的时间。
-
LastUpdatedTime:业务流程上次执行检查点的时间。
-
输入:编排流程的输入以 JSON 格式呈现。 如果
showInput 为 false,则不填充此字段。
-
CustomStatus:JSON 格式的自定义业务流程状态。
-
输出:编排的输出作为 JSON 格式值(如果编排完整完成)。 如果编排失败,此属性将包括失败详细信息。 如果编排暂停或终止,则此属性包括暂停或终止的原因(如果有)。
-
RuntimeStatus:以下值之一:
-
待处理:实例已被调度,但尚未开始运行。
-
正在运行:实例正在运行。
-
已完成:实例正常完成。
-
ContinuedAsNew:实例使用新的历史记录重新启动自身。 此状态是暂时性状态。
-
失败:实例失败并出现错误。
-
已终止:实例突然停止。
-
已挂起:该实例已暂停,可以在以后恢复。
-
历史记录:业务流程的执行历史记录。 仅当
showHistory 设置为 true 时,此字段才会被填充。
注释
只有当所有已计划任务完成Completed业务流程协调程序返回时,业务流程协调程序才会被标记为 。 换句话说,业务流程协调程序不足以达到其 return 语句,以便将其标记为 Completed。 在 WhenAny 被使用的情况下这一点尤为重要;这些协调器通常在所有计划任务执行之前会执行return。
如果实例不存在,此方法将 null 返回 (.NET 和 Java)、 undefined (JavaScript)或 None (Python)。
[FunctionName("GetStatus")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("check-status-queue")] string instanceId)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
// do something based on the current status.
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,请使用 OrchestrationClient 属性而不是 DurableClient 属性,并使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const status = await client.getStatus(instanceId);
// do something based on the current status.
// example: if status.runtimeStatus === df.OrchestrationRuntimeStatus.Running: ...
}
有关 function.json 配置,请参阅 “启动实例”。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
status = await client.get_status(instance_id)
# do something based on the current status
# example: if (existing_instance.runtime_status is df.OrchestrationRuntimeStatus.Running) { ...
param($Request, $TriggerMetadata)
# Get instanceid from body
$InstanceId = $Request.Body.InstanceId
$Status = Get-DurableStatus -InstanceId $InstanceId -ShowHistory -ShowHistoryOutput -ShowInput
Write-Host "Status: $($Status | ConvertTo-Json)"
# Do something based on status
# example: if ($Status.runtimeStatus -eq 'Running') { ... }
@FunctionName("GetStatus")
public void getStatus(
@QueueTrigger(name = "instanceID", queueName = "check-status-queue", connection = "Storage") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceID, false);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
switch (status) {
// do something based on the current status
}
}
}
using Microsoft.DurableTask.Client;
// Get the status of an orchestration instance
OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: true);
if (metadata != null)
{
OrchestrationRuntimeStatus status = metadata.RuntimeStatus;
// do something based on the current status
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Get the status of an orchestration instance
state = client.get_orchestration_state(instance_id, fetch_payloads=True)
if state is not None:
status = state.runtime_status
# do something based on the current status
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
// Get the status of an orchestration instance
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, true);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
// do something based on the current status
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Get the status of an orchestration instance
const state = await client.getOrchestrationState(instanceId, true);
if (state) {
const status = state.runtimeStatus;
// do something based on the current status
}
查询所有实例
可以使用语言 SDK 中的 API 来查询 任务中心内所有业务流程实例的状态。 此 “list-instances” 或 “get-status” API 返回表示与查询参数匹配的业务流程实例的对象列表。
[FunctionName("GetAllStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var noFilter = new OrchestrationStatusQueryCondition();
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
noFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
// Note: ListInstancesAsync only returns the first page of results.
// To request additional pages provide the result.ContinuationToken
// to the OrchestrationStatusQueryCondition's ContinuationToken property.
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instances = await client.getStatusAll();
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
有关 function.json 配置,请参阅 “启动实例”。
import logging
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instances = await client.get_status_all()
for instance in instances:
logging.log(json.dumps(instance))
有关 function.json 配置,请参阅 “启动实例”。
@FunctionName("GetAllStatus")
public String getAllStatus(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery noFilter = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(noFilter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Query all orchestration instances
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(new OrchestrationQuery());
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine(instance.InstanceId);
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Query all orchestration instances
instances = client.list_orchestrations()
for instance in instances:
print(instance.instance_id)
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationStatusQueryResult;
// Query all orchestration instances
OrchestrationStatusQuery query = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(query);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Query all orchestration instances
const instances = client.getAllInstances();
for await (const instance of instances) {
console.log(instance.instanceId);
}
使用筛选器查询实例
如果不需要标准实例查询提供的所有信息,该怎么办? 例如,如果只想查找业务流程创建时间或业务流程运行时状态,该怎么办? 通过应用筛选器缩小查询范围。
[FunctionName("QueryStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
// Get the first 100 running or pending instances that were created between 7 and 1 days ago
var queryFilter = new OrchestrationStatusQueryCondition
{
RuntimeStatus = new[]
{
OrchestrationRuntimeStatus.Pending,
OrchestrationRuntimeStatus.Running,
},
CreatedTimeFrom = DateTime.UtcNow.Subtract(TimeSpan.FromDays(7)),
CreatedTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1)),
PageSize = 100,
};
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
queryFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const runtimeStatus = [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Running,
];
const instances = await client.getStatusBy(
new Date(2021, 3, 10, 10, 1, 0),
new Date(2021, 3, 10, 10, 23, 59),
runtimeStatus
);
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
请参阅 启动实例 了解 function.json 配置。
import logging
from datetime import datetime
import json
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
runtime_status = [OrchestrationRuntimeStatus.Completed, OrchestrationRuntimeStatus.Running]
instances = await client.get_status_by(
datetime(2021, 3, 10, 10, 1, 0),
datetime(2021, 3, 10, 10, 23, 59),
runtime_status
)
for instance in instances:
logging.log(json.dumps(instance))
@FunctionName("GetRunningInstances")
public String getRunningInstances(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Get running or pending instances created in the last 7 days
var query = new OrchestrationQuery
{
Statuses = new[] { OrchestrationRuntimeStatus.Running, OrchestrationRuntimeStatus.Pending },
CreatedFrom = DateTime.UtcNow.AddDays(-7),
CreatedTo = DateTime.UtcNow.AddDays(-1),
PageSize = 100
};
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(query);
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine($"{instance.InstanceId}: {instance.RuntimeStatus}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Get running or pending instances created in the last 7 days
instances = client.list_orchestrations(
created_time_from=datetime.utcnow() - timedelta(days=7),
created_time_to=datetime.utcnow() - timedelta(days=1),
runtime_status=['RUNNING', 'PENDING']
)
for instance in instances:
print(f"{instance.instance_id}: {instance.runtime_status}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Get running or pending instances created in the last 7 days
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId() + ": " + instance.getRuntimeStatus());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Get running or pending instances created in the last 7 days
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const instances = client.getAllInstances({
statuses: [OrchestrationStatus.RUNNING, OrchestrationStatus.PENDING],
createdFrom: sevenDaysAgo,
createdTo: oneDayAgo,
});
for await (const instance of instances) {
console.log(`${instance.instanceId}: ${instance.runtimeStatus}`);
}
终止实例
如果协调实例运行时间过长,或者出于任何原因需要在它完成之前停止它,则可以结束它。
终止 API 的两个参数是 实例 ID 和 原因 字符串,用于写入日志和实例状态。
[FunctionName("TerminateInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("terminate-queue")] string instanceId)
{
string reason = "Found a bug";
return client.TerminateAsync(instanceId, reason);
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Found a bug";
return client.terminate(instanceId, reason);
};
请参阅 启动实例 了解 function.json 配置。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
reason = "Found a bug"
return await client.terminate(instance_id, reason)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$Reason = 'Found a bug'
Stop-DurableOrchestration -InstanceId $InstanceId -Reason $Reason
@FunctionName("TerminateInstance")
public void terminateInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String reason = "Found a bug";
durableContext.getClient().terminate(instanceID, reason);
}
using Microsoft.DurableTask.Client;
string reason = "Found a bug";
await client.TerminateInstanceAsync(instanceId, reason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
reason = "Found a bug"
client.terminate_orchestration(instance_id, reason=reason)
import com.microsoft.durabletask.DurableTaskClient;
String reason = "Found a bug";
client.terminate(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Found a bug";
await client.terminateOrchestration(instanceId, reason);
终止的实例最终会转换为 Terminated 状态。 但这种转换不会立即发生。 相反,终止操作与该实例的其他操作一起排入 Task Hub 中。 可以使用 实例查询 API 了解终止实例何时实际达到 Terminated 状态。
注释
实例终止操作目前不会传播。 无论是否结束调用它们的协调实例,活动函数和子协调都会完成运行。
暂停和恢复实例
通过暂停业务流程,可以停止正在运行的业务流程。 与结束一个编排过程不同,稍后可以恢复挂起的协调器。
挂起 API 的两个参数是实例 ID 和原因字符串,这些参数会被写入日志和实例状态。
[FunctionName("SuspendResumeInstance")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("suspend-resume-queue")] string instanceId)
{
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeAsync(instanceId, resumeReason);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
// To suspend an orchestration
const suspendReason = "Need to pause workflow";
await client.suspend(instanceId, suspendReason);
// To resume an orchestration
const resumeReason = "Continue workflow";
await client.resume(instanceId, resumeReason);
};
import azure.functions as func
import azure.durable_functions as df
from datetime import timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str):
client = df.DurableOrchestrationClient(starter)
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
await client.suspend(instance_id, suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
await client.resume(instance_id, resume_reason)
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
# To suspend an orchestration
$SuspendReason = 'Need to pause workflow'
Suspend-DurableOrchestration -InstanceId $InstanceId -Reason $SuspendReason
# To resume an orchestration
$ResumeReason = 'Continue workflow'
Resume-DurableOrchestration -InstanceId $InstanceId -Reason $ResumeReason
@FunctionName("SuspendResumeInstance")
public void suspendResumeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
DurableTaskClient client = durableContext.getClient();
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceID, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceID, resumeReason);
}
using Microsoft.DurableTask.Client;
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendInstanceAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeInstanceAsync(instanceId, resumeReason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
client.suspend_orchestration(instance_id, reason=suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
client.resume_orchestration(instance_id, reason=resume_reason)
import com.microsoft.durabletask.DurableTaskClient;
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceId, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceId, resumeReason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// To suspend an orchestration
await client.suspendOrchestration(instanceId);
// To resume an orchestration
await client.resumeOrchestration(instanceId);
暂停的实例最终会转换为 Suspended 状态。 但是,这种转换不会立即发生。 相反,暂停操作会与该实例的其他操作一起排入任务中心队列。 使用实例查询 API 了解正在运行的实例何时实际达到 Suspended 状态。
当恢复暂停的业务流程协调程序时,其状态会更改回 Running。
将事件发送到实例
在某些情况下,编排器函数需要等待和侦听外部事件。 此方法非常有用的示例包括 人工交互 方案。
在某些情况下,编排需要等待和监听外部事件。 此方法非常有用的示例包括 人工交互 方案。
可以使用业务流程客户端的 引发事件 API 将事件通知发送到正在运行的实例。 编排可以使用 等待外部事件 编排器 API 来监听和响应这些事件。
引发事件的参数包括:
-
实例 ID:实例的唯一 ID。
-
事件名称:要发送的事件的名称。
-
事件数据:要发送到实例的 JSON 可序列化有效负载。
[FunctionName("RaiseEvent")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("event-queue")] string instanceId)
{
int[] eventData = new int[] { 1, 2, 3 };
return client.RaiseEventAsync(instanceId, "MyEvent", eventData);
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const eventData = [ 1, 2, 3 ];
return client.raiseEvent(instanceId, "MyEvent", eventData);
};
请参阅 启动实例 了解 function.json 配置。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
event_data = [1, 2 ,3]
return await client.raise_event(instance_id, 'MyEvent', event_data)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$EventName = 'MyEvent'
$EventData = @(1,2,3)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName $EventName -EventData $EventData
@FunctionName("RaiseEvent")
public void raiseEvent(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String eventName = "MyEvent";
int[] eventData = { 1, 2, 3 };
durableContext.getClient().raiseEvent(instanceID, eventName, eventData);
}
using Microsoft.DurableTask.Client;
int[] eventData = new int[] { 1, 2, 3 };
await client.RaiseEventAsync(instanceId, "MyEvent", eventData);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
event_data = [1, 2, 3]
client.raise_orchestration_event(instance_id, "MyEvent", event_data)
import com.microsoft.durabletask.DurableTaskClient;
int[] eventData = { 1, 2, 3 };
client.raiseEvent(instanceId, "MyEvent", eventData);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const eventData = [1, 2, 3];
await client.raiseOrchestrationEvent(instanceId, "MyEvent", eventData);
注释
如果找不到具有指定实例ID的编排实例,事件消息将被丢弃。 如果实例存在但尚未等待事件,则事件将存储在实例状态中,直到它准备好接收和处理。
等待编排完成
在长时间运行的编排中,可能需要等待以获取编排的结果。 在这些情况下,定义编排的超时期限也很有用。 如果超时发生,将返回编排的状态,而不是结果。
使用 “等待完成或创建检查状态响应” API 同步获取业务流程实例的实际输出。 默认情况下,此方法超时为 10 秒,轮询间隔为 1 秒。
下面是演示如何使用此 API 的示例 HTTP 触发器函数:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace VSSample
{
public static class HttpSyncStart
{
private const string Timeout = "timeout";
private const string RetryInterval = "retryInterval";
[FunctionName("HttpSyncStart")]
public static async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}/wait")]
HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
string functionName,
ILogger log)
{
// Function input comes from the request content.
object eventData = await req.Content.ReadAsAsync<object>();
string instanceId = await starter.StartNewAsync(functionName, eventData);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
TimeSpan timeout = GetTimeSpan(req, Timeout) ?? TimeSpan.FromSeconds(30);
TimeSpan retryInterval = GetTimeSpan(req, RetryInterval) ?? TimeSpan.FromSeconds(1);
return await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(
req,
instanceId,
timeout,
retryInterval);
}
private static TimeSpan? GetTimeSpan(HttpRequestMessage request, string queryParameterName)
{
string queryParameterStringValue = request.RequestUri.ParseQueryString()[queryParameterName];
if (string.IsNullOrEmpty(queryParameterStringValue))
{
return null;
}
return TimeSpan.FromSeconds(double.Parse(queryParameterStringValue));
}
}
}
const df = require("durable-functions");
const timeout = "timeout";
const retryInterval = "retryInterval";
module.exports = async function (context, req) {
const client = df.getClient(context);
const instanceId = await client.startNew(req.params.functionName, undefined, req.body);
context.log(`Started orchestration with ID = '${instanceId}'.`);
const timeoutInMilliseconds = getTimeInSeconds(req, timeout) || 30000;
const retryIntervalInMilliseconds = getTimeInSeconds(req, retryInterval) || 1000;
const response = client.waitForCompletionOrCreateCheckStatusResponse(
context.bindingData.req,
instanceId,
timeoutInMilliseconds,
retryIntervalInMilliseconds
);
return response;
};
function getTimeInSeconds(req, queryParameterName) {
const queryValue = req.query[queryParameterName];
return queryValue
? queryValue * 1000 // expected to be in seconds
: undefined;
}
请参阅 启动实例 了解 function.json 配置。
import logging
import azure.functions as func
import azure.durable_functions as df
timeout = "timeout"
retry_interval = "retryInterval"
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params['functionName'], None, req.get_body())
logging.log(f"Started orchestration with ID = '${instance_id}'.")
timeout_in_milliseconds = get_time_in_seconds(req, timeout)
timeout_in_milliseconds = timeout_in_milliseconds if timeout_in_milliseconds != None else 30000
retry_interval_in_milliseconds = get_time_in_seconds(req, retry_interval)
retry_interval_in_milliseconds = retry_interval_in_milliseconds if retry_interval_in_milliseconds != None else 1000
return await client.wait_for_completion_or_create_check_status_response(
req,
instance_id,
timeout_in_milliseconds,
retry_interval_in_milliseconds
)
def get_time_in_seconds(req: func.HttpRequest, query_parameter_name: str):
query_value = req.params.get(query_parameter_name)
return query_value if query_value != None else 1000
注释
PowerShell 目前没有此方案的内置命令。
Java目前对此方案没有单个方法,但你可以使用几个额外的代码行来实现它。
@FunctionName("HttpStartAndWait")
public HttpResponseMessage httpStartAndWait(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/wait", methods = {HttpMethod.POST}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("functionName") String functionName,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance(functionName);
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
try {
String timeoutString = req.getQueryParameters().get("timeout");
Integer timeoutInSeconds = Integer.parseInt(timeoutString);
OrchestrationMetadata orchestration = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(timeoutInSeconds),
true /* getInputsAndOutputs */);
return req.createResponseBuilder(HttpStatus.OK)
.body(orchestration.getSerializedOutput())
.header("Content-Type", "application/json")
.build();
} catch (TimeoutException timeoutEx) {
// timeout expired - return a 202 response
return durableContext.createCheckStatusResponse(req, instanceId);
}
}
Durable Task SDK 提供一种方法来同步等待编排完成。
using Microsoft.DurableTask.Client;
// Wait for orchestration to complete with a timeout
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(
instanceId,
timeout: TimeSpan.FromSeconds(30),
getInputsAndOutputs: true);
if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
Console.WriteLine($"Output: {metadata.SerializedOutput}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Wait for orchestration to complete with a timeout
state = client.wait_for_orchestration_completion(
instance_id,
timeout=30,
fetch_payloads=True)
if state.runtime_status == 'COMPLETED':
print(f"Output: {state.serialized_output}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
// Wait for orchestration to complete with a timeout
try {
OrchestrationMetadata metadata = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true /* getInputsAndOutputs */);
System.out.println("Output: " + metadata.getSerializedOutput());
} catch (TimeoutException e) {
System.out.println("Orchestration did not complete within timeout");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Wait for orchestration to complete with a timeout
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
if (state?.runtimeStatus === OrchestrationStatus.COMPLETED) {
console.log(`Output: ${state.serializedOutput}`);
}
使用以下行调用函数。 对超时使用两秒,对重试间隔使用 0.5 秒:
curl -X POST "http://localhost:7071/orchestrators/E1_HelloSequence/wait?timeout=2&retryInterval=0.5"
注释
上述 cURL 命令假定你的项目中有一个名为E1_HelloSequence的编排器函数。 由于 HTTP 触发器函数的编写方式,因此可以将它替换为项目中任何业务流程协调程序函数的名称。
根据从编排实例获取响应所需的时间,存在两种情况:
- 业务流程实例在定义的超时范围内完成(在本例中为两秒),响应是实际业务流程实例输出,以同步方式传递:
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:14:29 GMT
Transfer-Encoding: chunked
[
"Hello Tokyo!",
"Hello Seattle!",
"Hello London!"
]
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:13:51 GMT
Location: http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}
Retry-After: 10
Transfer-Encoding: chunked
{
"id": "d3b72dddefce4e758d92f4d411567177",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/raiseEvent/{eventName}?taskHub={taskHub}&connection={connection}&code={systemKey}",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/terminate?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/suspend?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/resume?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}"
}
注释
Webhook URL 的格式可能会有所不同,具体取决于运行Azure Functions主机的版本。 前面的示例适用于 Azure Functions 3.0 主机。
检索 HTTP 管理 Webhook 的 URL 列表
使用外部系统监控或向编排触发事件。 外部系统通过 webhook URL 与Durable Functions通信,这些 URL 是 HTTP API URL 发现中所述的默认响应的一部分。 Webhook URL 也可以通过业务流程客户端绑定以编程方式访问。 具体而言, 创建 HTTP 管理有效负载 API 获取包含这些 Webhook URL 的可序列化对象。
创建 HTTP 管理有效负载 API 有一个参数:
这些方法返回具有以下字符串属性的对象:
-
ID:编排的实例 ID(应与
InstanceId 输入相同)。
-
StatusQueryGetUri:业务流程实例的状态 URL。
-
SendEventPostUri:流程实例的“引发事件”URL。
-
TerminatePostUri:协调实例的“terminate”URL。
-
PurgeHistoryDeleteUri:业务流程实例的“清除历史记录”URL。
-
SuspendPostUri:编排实例的“suspend”URL。
-
ResumePostUri:编排实例的“resume”URL。
函数将这些对象的实例发送到外部系统,以监视或引发相应业务流程上的事件,如以下示例所示。
[FunctionName("SendInstanceInfo")]
public static void SendInstanceInfo(
[ActivityTrigger] IDurableActivityContext ctx,
[DurableClient] IDurableOrchestrationClient client,
[CosmosDB(
databaseName: "MonitorDB",
containerName: "HttpManagementPayloads",
Connection = "CosmosDBConnectionSetting")]out dynamic document)
{
HttpManagementPayload payload = client.CreateHttpManagementPayload(ctx.InstanceId);
// send the payload to Azure Cosmos DB
document = new { Payload = payload, id = ctx.InstanceId };
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,请使用 DurableActivityContext 而不是 IDurableActivityContext,使用 OrchestrationClient 属性而不是 DurableClient 属性,并使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
modules.exports = async function(context, ctx) {
const client = df.getClient(context);
const payload = client.createHttpManagementPayload(ctx.instanceId);
// send the payload to Azure Cosmos DB
context.bindings.document = JSON.stringify({
id: ctx.instanceId,
payload,
});
};
请参阅 启动实例 了解 function.json 配置。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.cosmosdb.cdb.Document:
client = df.DurableOrchestrationClient(starter)
payload = client.create_check_status_response(req, instance_id).get_body().decode()
return func.cosmosdb.CosmosDBConverter.encode({
id: instance_id,
payload: payload
})
using namespace System.Net
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
回滚实例
如果由于意外原因出现业务流程故障,请使用为该目的生成的 API 将实例倒退 到以前正常的状态。
注释
此 API 不应替代正确的错误处理和重试策略。 相反,它仅用于业务流程实例因意外原因而失败的情况。 除了Failed状态中的协调流程(例如Running、Pending、Terminated或Completed)之外,其他状态中的协调流程不能“重置”。 有关错误处理和重试策略的详细信息,请参阅 错误处理 文章。
请使用RewindAsync的 rewind (.NET) 或 (JavaScript) 方法将业务流程恢复到 Running 状态。 此方法还会重新运行导致编排过程失败的活动的执行失败或子编排的执行失败。
例如,假设你有一个涉及一系列 人工审批的工作流。 假设一系列活动函数通知某人需要审批并等待实时响应。 假设在所有审批活动收到响应或超时之后,另一个活动由于应用程序配置错误(例如无效的数据库连接字符串)而失败。 结果是编排故障发生在工作流中。 使用 RewindAsync (.NET) 或 rewind (JavaScript) API,应用程序管理员可以修复配置错误,并在失败前立即将失败的业务流程回退到状态。 无需重新批准人工交互步骤,业务流程现在可以成功完成。
[FunctionName("RewindInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("rewind-queue")] string instanceId)
{
string reason = "Orchestrator failed and needs to be revived.";
return client.RewindAsync(instanceId, reason);
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Orchestrator failed and needs to be revived.";
return client.rewind(instanceId, reason);
};
请参阅 启动实例 了解 function.json 配置。
using Microsoft.DurableTask.Client;
string reason = "Orchestrator failed and needs to be revived.";
await client.RewindInstanceAsync(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Orchestrator failed and needs to be revived.";
await client.rewindInstance(instanceId, reason);
此示例仅适用于 .NET 和 JavaScript。
此示例仅适用于 .NET 和 JavaScript。
此示例仅适用于 .NET 和 JavaScript。
重启实例
重启编排会使用先前运行的实例的历史记录来创建一个新实例。 如果要使用相同的输入和实例 ID 模式重新运行业务流程,基于原始版本创建新的运行,此功能非常有用。
[FunctionName("RestartInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("restart-queue")] string instanceId)
{
return client.RestartAsync(instanceId, restartWithNewInstanceId: true);
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
using Microsoft.DurableTask.Client;
// Restart an orchestration with a new instance ID
string newInstanceId = await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: true);
Console.WriteLine($"Restarted as new instance: {newInstanceId}");
// Restart an orchestration keeping the same instance ID
await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Restart an orchestration with a new instance ID
const newInstanceId = await client.restartOrchestration(instanceId, true);
console.log(`Restarted as new instance: ${newInstanceId}`);
// Restart an orchestration keeping the same instance ID
await client.restartOrchestration(instanceId, false);
此示例仅适用于 .NET 和 JavaScript。
此示例仅适用于 .NET 和 JavaScript。
此示例仅适用于 .NET 和 JavaScript。
清除实例历史记录
若要删除与业务流程关联的所有数据,请清除实例历史记录。 例如,删除与已完成实例关联的任何存储资源。 使用编排客户端定义的清除实例 API。
以下示例演示如何清理单个编排实例。
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("purge-queue")] string instanceId)
{
return client.PurgeInstanceHistoryAsync(instanceId);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
return client.purgeInstanceHistory(instanceId);
};
请参阅 启动实例 了解 function.json 配置。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
return await client.purge_instance_history(instance_id)
@FunctionName("PurgeInstance")
public HttpResponseMessage purgeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, route = "purge/{instanceID}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("instanceID") String instanceID) {
PurgeResult result = durableContext.getClient().purgeInstance(instanceID);
if (result.getDeletedInstanceCount() == 0) {
return req.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("No completed instance with ID '" + instanceID + "' was found!")
.build();
} else {
return req.createResponseBuilder(HttpStatus.OK)
.body("Successfully purged data for " + instanceID)
.build();
}
}
using Microsoft.DurableTask.Client;
// Purge a single orchestration instance
PurgeResult result = await client.PurgeInstanceAsync(instanceId);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Purge a single orchestration instance
result = client.purge_orchestration(instance_id)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeResult;
// Purge a single orchestration instance
PurgeResult result = client.purgeInstance(instanceId);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Purge a single orchestration instance
const result = await client.purgeOrchestration(instanceId);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
以下示例显示了一个计时器触发的函数,该函数清除指定时间间隔后完成的所有业务流程实例的历史记录。 在这种情况下,它会删除 30 天或多天前完成的所有实例的数据。 本示例函数计划每天运行一次,UTC 下午 12:00:
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[TimerTrigger("0 0 12 * * *")] TimerInfo myTimer)
{
return client.PurgeInstanceHistoryAsync(
DateTime.MinValue,
DateTime.UtcNow.AddDays(-30),
new List<OrchestrationStatus>
{
OrchestrationStatus.Completed
});
}
注释
前面的 C# 代码适用于 Durable Functions 2.x。 对于 Durable Functions 1.x,必须使用 OrchestrationClient 属性而不是 DurableClient 特性,并且必须使用 DurableOrchestrationClient 参数类型而不是 IDurableOrchestrationClient参数类型。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
该方法 purgeInstanceHistoryBy 可用于有条件地清除多个实例的实例历史记录。
function.json
{
"bindings": [
{
"schedule": "0 0 12 * * *",
"name": "myTimer",
"type": "timerTrigger",
"direction": "in"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注释
此示例适用于 Durable Functions 2.x 版本。 在版本 1.x 中,使用 orchestrationClient 而不是 durableClient.
index.js
const df = require("durable-functions");
module.exports = async function (context, myTimer) {
const client = df.getClient(context);
const createdTimeFrom = new Date(0);
const createdTimeTo = new Date().setDate(today.getDate() - 30);
const runtimeStatuses = [ df.OrchestrationRuntimeStatus.Completed ];
return client.purgeInstanceHistoryBy(createdTimeFrom, createdTimeTo, runtimeStatuses);
};
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.DurableOrchestrationStatus import OrchestrationRuntimeStatus
from datetime import datetime, timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
created_time_from = datetime.min
created_time_to = datetime.today() + timedelta(days = -30)
runtime_statuses = [OrchestrationRuntimeStatus.Completed]
return await client.purge_instance_history_by(created_time_from, created_time_to, runtime_statuses)
@FunctionName("PurgeInstances")
public void purgeInstances(
@TimerTrigger(name = "purgeTimer", schedule = "0 0 12 * * *") String timerInfo,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
ExecutionContext context) throws TimeoutException {
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(60)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = durableContext.getClient().purgeInstances(criteria);
context.getLogger().info(String.format("Purged %d instance(s)", result.getDeletedInstanceCount()));
}
using Microsoft.DurableTask.Client;
// Purge completed instances older than 30 days
var filter = new PurgeInstancesFilter(
CreatedFrom: DateTime.MinValue,
CreatedTo: DateTime.UtcNow.AddDays(-30),
Statuses: new[] { OrchestrationRuntimeStatus.Completed });
PurgeResult result = await client.PurgeAllInstancesAsync(filter);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Purge completed instances older than 30 days
result = client.purge_orchestrations(
created_time_from=datetime.min,
created_time_to=datetime.utcnow() - timedelta(days=30),
runtime_status=['COMPLETED']
)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeInstanceCriteria;
import com.microsoft.durabletask.PurgeResult;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Purge completed instances older than 30 days
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = client.purgeInstances(criteria);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus, PurgeInstanceCriteria } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Purge completed instances older than 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const criteria = new PurgeInstanceCriteria();
criteria.setCreatedTimeTo(thirtyDaysAgo);
criteria.setRuntimeStatusList([OrchestrationStatus.COMPLETED]);
const result = await client.purgeOrchestration(criteria);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
注释
要使清除历史记录作成功,目标实例的运行时状态必须为 “已完成”、“ 终止”或 “失败”。
后续步骤