使用持久业务流程编排系统时,升级和降级流程编排是一个关键的考虑因素。 当自动化流程被中断后恢复时(例如,在主机更新期间),Durable Task Scheduler 将重播自动化流程中的事件,确保在执行下一步之前,所有以前的步骤均已成功执行。 此作可确保可靠性,这是持久执行范例的核心承诺之一。
如果业务流程在部署之间发生更改,则执行的步骤可能不再相同。 在这种情况下,系统会抛出一个NonDeterministicError,而不是允许编排继续进行。
业务流程版本控制 可防止与不确定相关的问题,使你能够与新的(或旧)业务流程无缝协作。 持久任务计划程序具有两种不同的版本控制样式,可以单独使用或结合使用:
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
基于客户端/上下文的条件版本控制
为了使编排具有版本,必须先在客户端中进行设置。
.NET SDK 使用标准主机生成器扩展。
注释
自 v1.9.0 起在 .NET SDK 中Microsoft.DurableTask.Client.AzureManaged可用。
builder.Services.AddDurableTaskClient(builder =>
{
builder.UseDurableTaskScheduler(connectionString);
builder.UseDefaultVersion("1.0.0");
});
注释
自 v1.6.0 起在 Java SDK 中com.microsoft:durabletask-client提供。
public DurableTaskClient durableTaskClient(DurableTaskProperties properties) {
// Create client using Azure-managed extensions
return DurableTaskSchedulerClientExtensions.createClientBuilder(properties.getConnectionString())
.defaultVersion("1.0")
.build();
}
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential,
default_version="1.0.0")
将版本添加到客户端后,此主机启动的任何业务流程都使用该版本 1.0.0。 版本是一个简单的字符串,接受任何值。 但是,SDK 会尝试将其转换为 。NET 的 System.Version。
- 如果 可以 转换,则使用该库进行比较。
- 如果没有 , 则使用简单的字符串比较。
在客户端中提供版本也会使版本在 TaskOrchestrationContext中可用,这意味着您可以在条件语句中使用该版本。 只要较新的业务流程版本具有适当的版本限制,旧业务流程版本和新业务流程版本都可以在同一主机上一起运行。
Example:
[DurableTask]
class HelloCities : TaskOrchestrator<string, List<string>>
{
private readonly string[] Cities = ["Seattle", "Amsterdam", "Hyderabad", "Kuala Lumpur", "Shanghai", "Tokyo"];
public override async Task<List<string>> RunAsync(TaskOrchestrationContext context, string input)
{
List<string> results = [];
foreach (var city in Cities)
{
results.Add(await context.CallSayHelloAsync($"{city} v{context.Version}"));
if (context.CompareVersionTo("2.0.0") >= 0)
{
results.Add(await context.CallSayGoodbyeAsync($"{city} v{context.Version}"));
}
}
Console.WriteLine("HelloCities orchestration completed.");
return results;
}
}
将版本添加到客户端后,此客户端启动的任何业务流程都使用该版本 1.0.0。 版本是一个简单的字符串,接受任何值。
在客户端中提供的版本也在 TaskOrchestration 中可用,这意味着你可以在条件语句中使用该版本。 只要较新的业务流程版本具有适当的版本限制,旧业务流程版本和新业务流程版本都可以在同一客户端上运行。
Example:
public TaskOrchestration create() {
return ctx -> {
List<String> results = new ArrayList<>();
for (String city : new String[]{ "Seattle", "Amsterdam", "Hyderabad", "Kuala Lumpur", "Shanghai", "Tokyo" }) {
results.add(ctx.callActivity("SayHello", city, String.class).await());
if (VersionUtils.compareVersions(ctx.getVersion(), "2.0.0") >= 0) {
// Simulate a delay for newer versions
results.add(ctx.callActivity("SayGoodbye", city, String.class).await());
}
}
ctx.complete(results);
};
}
将版本添加到客户端后,此客户端启动的任何业务流程都使用该版本 1.0.0。 版本是一个使用packaging.version解析的简单字符串,它支持语义版本控制比较,并接受任何值。
在客户端中提供版本也会使版本在 task.OrchestrationContext中可用,这意味着您可以在条件语句中使用该版本。 只要较新的业务流程版本具有适当的版本限制,旧业务流程版本和新业务流程版本都可以在同一客户端上运行。
Example:
def orchestrator(ctx: task.OrchestrationContext, _):
if ctx.version == "1.0.0":
# For version 1.0.0, we use the original logic
result: int = yield ctx.call_activity(activity_v1, input="input for v1")
elif ctx.version == "2.0.0":
# For version 2.0.0, we use the updated logic
result: int = yield ctx.call_activity(activity_v2, input="input for v2")
else:
raise ValueError(f"Unsupported version: {ctx.version}")
return {
'result': result,
}
在此示例中,我们向 SayGoodbye 业务流程中添加了一个 HelloCities 活动。 此活动仅在编排版本 2.0.0 及更高版本中调用。 使用简单条件语句时,任何版本小于 2.0.0 的编排将继续运行,并且任何新的编排都包含新的活动。
何时使用客户端版本控制
虽然客户端版本控制为版本控制业务流程提供了最简单的机制,但与版本交互可能会占用大量编程。 在以下的情况下使用客户端版本控制:
- 你想要一个适用于所有版本的标准版本,还是
- 您需要针对特定版本设计自定义逻辑。
基于工作线程的版本控制
虽然编排仍然需要客户端版本来设置版本,但基于工作单元的版本控制方法有助于避免在编排中使用条件语句。 工作线程选择如何在开始执行之前处理不同版本的协调流程。
工作者版本控制需设置以下字段:
工作进程的版本。
应用于由工作程序启动的子协调程序的默认版本。
工作器用于匹配编排版本的策略。
名称 DESCRIPTION 没有 处理工作时不考虑版本 严格 业务流程中的版本和辅助角色必须完全匹配 当前或更早版本 业务流程中的版本必须等于或小于辅助角色中的版本 如果版本不符合匹配策略,工人采取的策略。
名称 DESCRIPTION 拒绝 该编排被工作者拒绝,但仍保留在工作队列中,以便稍后再次尝试。 失败 编排失败后从工作队列中删除
与客户端版本控制类似,可以通过标准主机生成器模式设置这些字段。
注释
自 v1.9.0 起,在 .NET SDK(Microsoft.DurableTask.Worker.AzureManaged)中可用。
builder.Services.AddDurableTaskWorker(builder =>
{
builder.AddTasks(r => r.AddAllGeneratedTasks());
builder.UseDurableTaskScheduler(connectionString);
builder.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions
{
Version = "1.0.0",
DefaultVersion = "1.0.0",
MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict,
FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Reject,
});
});
注释
自 v1.6.0 起,Java SDK(com.microsoft:durabletask-client)中提供。
private static DurableTaskGrpcWorker createTaskHubServer() {
DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder();
builder.useVersioning(new DurableTaskGrpcWorkerVersioningOptions(
"1.0",
"1.0",
DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.CURRENTOROLDER,
DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.REJECT));
// Orchestrations can be defined inline as anonymous classes or as concrete classes
builder.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "HelloCities"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<String> results = new ArrayList<>();
for (String city : new String[]{ "Seattle", "Amsterdam", "Hyderabad", "Kuala Lumpur", "Shanghai", "Tokyo" }) {
results.add(ctx.callActivity("SayHello", city, String.class).await());
}
ctx.complete(results);
};
}
});
// Activities can be defined inline as anonymous classes or as concrete classes
builder.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "SayHello"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return "Hello, " + input + "!";
};
}
});
return builder.build();
}
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
# This worker is versioned for v2, as the orchestrator code has already been updated
# CURRENT_OR_OLDER allows this worker to process orchestrations versioned below 2.0.0 - e.g. 1.0.0
w.use_versioning(worker.VersioningOptions(
version="2.0.0",
default_version="2.0.0",
match_strategy=worker.VersionMatchStrategy.CURRENT_OR_OLDER,
failure_strategy=worker.VersionFailureStrategy.FAIL
))
w.add_orchestrator(orchestrator)
w.add_activity(activity_v1)
w.add_activity(activity_v2)
w.start()
失败策略
拒绝
使用 Reject 故障策略,当期望的行为是业务编排在稍后时间或在不同的工作节点上重试时。 在 Reject 故障期间:
- 编排被拒绝,并返回到工作队列。
- 一个编排已出队。
- 出队的编排可能被分配给不同的工作线程,也可能再次分配给同一工作线程。
此过程将重复,直到有可以胜任编排任务的工作节点可用为止。 此策略可无缝处理编排更新的部署。 随着部署的进行,无法处理编排的工作节点会拒绝它,而能够处理编排的工作节点会处理它。
具有混合工作者和编排版本的能力支持蓝绿部署等方案。
失败
不期望其他版本时,请使用Fail失败策略。 在这种情况下,新版本是一个异常,任何工作者甚至都不应该尝试操作它。 持久任务计划程序导致协调失败,使其处于结束状态。
何时使用工作进程版本控制
在不应执行未知或不受支持的编排版本的情况下,使用工作器版本控制。 辅助角色版本控制将停止业务流程执行,而不是将版本处理代码置于辅助角色中。 此方法允许更简单的编排代码。 无需进行任何代码更改,即可处理各种部署方案,例如蓝绿部署。