使用自定义编排状态可以为编排实例设置自定义状态值。 外部客户端可以在业务流程运行时查询此值以跟踪进度或共享元数据。
在 Durable Task SDK 中,可以通过 DurableTaskClient 上的业务流程状态查询 API(例如,.NET中的 GetInstanceAsync 或 Java 中的 getInstanceMetadata)获取此状态。
重要
目前,PowerShell Durable Task SDK 不可用。
示例用例
可视化进度
客户端可以轮询状态终结点,并显示一个可视化当前实施阶段的进度界面。 以下示例展示了如何共享进度:
注释
这些示例针对 Durable Functions 2.x 编写,并且与 Durable Functions 1.x 不兼容。 有关版本之间差异的详细信息,请参阅 Durable Functions 版本一文。
[FunctionName("E1_HelloSequence")]
public static async Task<List<string>> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>();
outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Tokyo"));
context.SetCustomStatus("Tokyo");
outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Seattle"));
context.SetCustomStatus("Seattle");
outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "London"));
context.SetCustomStatus("London");
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("E1_SayHello")]
public static string SayHello([ActivityTrigger] string name)
{
return $"Hello {name}!";
}
E1_HelloSequence 编排器函数:
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context){
const outputs = [];
outputs.push(yield context.df.callActivity("E1_SayHello", "Tokyo"));
context.df.setCustomStatus("Tokyo");
outputs.push(yield context.df.callActivity("E1_SayHello", "Seattle"));
context.df.setCustomStatus("Seattle");
outputs.push(yield context.df.callActivity("E1_SayHello", "London"));
context.df.setCustomStatus("London");
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
});
E1_SayHello 活动函数:
module.exports = async function(context, name) {
return `Hello ${name}!`;
};
E1_HelloSequence Orchestrator 函数
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
output1 = yield context.call_activity('E1_SayHello', 'Tokyo')
context.set_custom_status('Tokyo')
output2 = yield context.call_activity('E1_SayHello', 'Seattle')
context.set_custom_status('Seattle')
output3 = yield context.call_activity('E1_SayHello', 'London')
context.set_custom_status('London')
return [output1, output2, output3]
main = df.Orchestrator.create(orchestrator_function)
E1_SayHello 活动函数
def main(name: str) -> str:
return f"Hello {name}!"
E1_HelloSequence Orchestrator 函数
param($Context)
$output = @()
$output += Invoke-DurableActivity -FunctionName 'E1_SayHello' -Input 'Tokyo'
Set-DurableCustomStatus -CustomStatus 'Tokyo'
$output += Invoke-DurableActivity -FunctionName 'E1_SayHello' -Input 'Seattle'
Set-DurableCustomStatus -CustomStatus 'Seattle'
$output += Invoke-DurableActivity -FunctionName 'E1_SayHello' -Input 'London'
Set-DurableCustomStatus -CustomStatus 'London'
return $output
E1_SayHello 活动函数
param($name)
"Hello $name"
@FunctionName("HelloCities")
public String helloCitiesOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String result = "";
result += ctx.callActivity("SayHello", "Tokyo", String.class).await() + ", ";
ctx.setCustomStatus("Tokyo");
result += ctx.callActivity("SayHello", "London", String.class).await() + ", ";
ctx.setCustomStatus("London");
result += ctx.callActivity("SayHello", "Seattle", String.class).await();
ctx.setCustomStatus("Seattle");
return result;
}
@FunctionName("SayHello")
public String sayHello(@DurableActivityTrigger(name = "name") String name) {
return String.format("Hello %s!", name);
}
客户端可以轮询编排元数据,并显示一个进度用户界面以可视化当前的执行阶段。 以下示例展示了如何共享进度:
using System.Threading.Tasks;
using Microsoft.DurableTask;
public class HelloCities : TaskOrchestrator<object?, string>
{
public override async Task<string> RunAsync(TaskOrchestrationContext context, object? input)
{
string result = "";
result += await context.CallActivityAsync<string>("SayHello", "Tokyo") + ", ";
context.SetCustomStatus("Tokyo");
result += await context.CallActivityAsync<string>("SayHello", "London") + ", ";
context.SetCustomStatus("London");
result += await context.CallActivityAsync<string>("SayHello", "Seattle");
context.SetCustomStatus("Seattle");
return result;
}
}
from durabletask import task
def say_hello(ctx: task.ActivityContext, name: str) -> str:
return f"Hello {name}!"
def hello_cities(ctx: task.OrchestrationContext, _):
result = ""
result += (yield ctx.call_activity(say_hello, input="Tokyo")) + ", "
ctx.set_custom_status("Tokyo")
result += (yield ctx.call_activity(say_hello, input="London")) + ", "
ctx.set_custom_status("London")
result += yield ctx.call_activity(say_hello, input="Seattle")
ctx.set_custom_status("Seattle")
return result
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class HelloCities implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String result = "";
result += ctx.callActivity("SayHello", "Tokyo", String.class).await() + ", ";
ctx.setCustomStatus("Tokyo");
result += ctx.callActivity("SayHello", "London", String.class).await() + ", ";
ctx.setCustomStatus("London");
result += ctx.callActivity("SayHello", "Seattle", String.class).await();
ctx.setCustomStatus("Seattle");
ctx.complete(result);
}
}
import { ActivityContext, OrchestrationContext, TOrchestrator } from "@microsoft/durabletask-js";
const sayHello = async (_: ActivityContext, name: string): Promise<string> => {
return `Hello ${name}!`;
};
const helloCities: TOrchestrator = async function* (ctx: OrchestrationContext): any {
let result = "";
result += (yield ctx.callActivity(sayHello, "Tokyo")) + ", ";
ctx.setCustomStatus("Tokyo");
result += (yield ctx.callActivity(sayHello, "London")) + ", ";
ctx.setCustomStatus("London");
result += yield ctx.callActivity(sayHello, "Seattle");
ctx.setCustomStatus("Seattle");
return result;
};
客户端可以轮询业务流程元数据并等待,直到 CustomStatus 字段设置为 "London":
using System.Threading.Tasks;
using Microsoft.DurableTask.Client;
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("HelloCities");
OrchestrationMetadata metadata = await client.WaitForInstanceStartAsync(instanceId, getInputsAndOutputs: true);
while (metadata.SerializedCustomStatus is null || metadata.ReadCustomStatusAs<string>() != "London")
{
await Task.Delay(200);
metadata = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: true) ?? metadata;
}
import time
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Assumes 'client' is a DurableTaskSchedulerClient instance
instance_id = client.schedule_new_orchestration(hello_cities)
state = client.wait_for_orchestration_start(instance_id, fetch_payloads=True)
while state.serialized_custom_status is None or state.serialized_custom_status != '"London"':
time.sleep(0.2)
state = client.get_orchestration_state(instance_id, fetch_payloads=True)
String instanceId = client.scheduleNewOrchestrationInstance("HelloCities");
OrchestrationMetadata metadata = client.waitForInstanceStart(instanceId, Duration.ofMinutes(5), true);
while (!"London".equals(metadata.readCustomStatusAs(String.class))) {
Thread.sleep(200);
metadata = client.getInstanceMetadata(instanceId, true);
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const instanceId = await client.scheduleNewOrchestration(helloCities);
let state = await client.waitForOrchestrationStart(instanceId, true, 60);
while (!state?.serializedCustomStatus || JSON.parse(state.serializedCustomStatus) !== "London") {
await new Promise((resolve) => setTimeout(resolve, 200));
state = await client.getOrchestrationState(instanceId, true);
}
然后,仅当CustomStatus字段设置为“伦敦”时,客户端才会接收编排流程的输出。
[FunctionName("HttpStart")]
public static async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
string functionName,
ILogger log)
{
// Function input comes from the request content.
dynamic eventData = await req.Content.ReadAsAsync<object>();
string instanceId = await starter.StartNewAsync(functionName, (string)eventData);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
DurableOrchestrationStatus durableOrchestrationStatus = await starter.GetStatusAsync(instanceId);
while (durableOrchestrationStatus.CustomStatus.ToString() != "London")
{
await Task.Delay(200);
durableOrchestrationStatus = await starter.GetStatusAsync(instanceId);
}
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(JsonConvert.SerializeObject(durableOrchestrationStatus))
};
return httpResponseMessage;
}
}
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
// Function input comes from the request content.
const eventData = req.body;
const instanceId = await client.startNew(req.params.functionName, undefined, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
let durableOrchestrationStatus = await client.getStatus(instanceId);
while (durableOrchestrationStatus.customStatus.toString() !== "London") {
await new Promise((resolve) => setTimeout(resolve, 200));
durableOrchestrationStatus = await client.getStatus(instanceId);
}
const httpResponseMessage = {
status: 200,
body: JSON.stringify(durableOrchestrationStatus),
};
return httpResponseMessage;
};
注释
在 JavaScript 中,当下一个 customStatus 或 yield 操作被调度时,return 字段将被设置。
import json
import logging
import azure.functions as func
import azure.durable_functions as df
from time import sleep
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.params.functionName, None, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
durable_orchestration_status = await client.get_status(instance_id)
while durable_orchestration_status.custom_status != 'London':
sleep(0.2)
durable_orchestration_status = await client.get_status(instance_id)
return func.HttpResponse(body='Success', status_code=200, mimetype='application/json')
注释
在 Python 中,当下次安排 custom_status 或 yield 动作时,return 字段会被设置。
@FunctionName("StartHelloCities")
public HttpResponseMessage startHelloCities(
@HttpTrigger(name = "req") HttpRequestMessage<Void> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) throws InterruptedException {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("HelloCities");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
OrchestrationMetadata metadata;
try {
metadata = client.waitForInstanceStart(instanceId, Duration.ofMinutes(5), true);
} catch (TimeoutException ex) {
return req.createResponseBuilder(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
while (!"Shanghai".equals(metadata.readCustomStatusAs(String.class))) {
Thread.sleep(200);
metadata = client.getInstanceMetadata(instanceId, true);
}
return req.createResponseBuilder(HttpStatus.OK).build();
}
输出自定义
可以使用自定义编排状态,根据用户的独特特征或交互返回自定义输出,从而细分用户。 使用自定义业务流程状态时,客户端代码保持不变,而所有主要修改都发生在服务器端:
[FunctionName("CityRecommender")]
public static void Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int userChoice = context.GetInput<int>();
switch (userChoice)
{
case 1:
context.SetCustomStatus(new
{
recommendedCities = new[] {"Tokyo", "Seattle"},
recommendedSeasons = new[] {"Spring", "Summer"}
});
break;
case 2:
context.SetCustomStatus(new
{
recommendedCities = new[] {"Seattle", "London"},
recommendedSeasons = new[] {"Summer"}
});
break;
case 3:
context.SetCustomStatus(new
{
recommendedCities = new[] {"Tokyo", "London"},
recommendedSeasons = new[] {"Spring", "Summer"}
});
break;
}
// Wait for user selection and refine the recommendation
}
CityRecommender 编排器
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const userChoice = context.df.getInput();
switch (userChoice) {
case 1:
context.df.setCustomStatus({
recommendedCities: [ "Tokyo", "Seattle" ],
recommendedSeasons: [ "Spring", "Summer" ],
});
break;
case 2:
context.df.setCustomStatus({
recommendedCities: [ "Seattle", "London" ],
recommendedSeasons: [ "Summer" ],
});
break;
case 3:
context.df.setCustomStatus({
recommendedCities: [ "Tokyo", "London" ],
recommendedSeasons: [ "Spring", "Summer" ],
});
break;
}
// Wait for user selection and refine the recommendation
});
CityRecommender 编排器
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
userChoice = int(context.get_input())
if userChoice == 1:
context.set_custom_status({
'recommendedCities': ['Tokyo', 'Seattle'],
'recommendedSeasons': ['Spring', 'Summer']
})
elif userChoice == 2:
context.set_custom_status({
'recommendedCities': ['Seattle', 'London'],
'recommendedSeasons': ['Summer']
})
elif userChoice == 3:
context.set_custom_status({
'recommendedCities': ['Tokyo', 'London'],
'recommendedSeasons': ['Spring', 'Summer']
})
# Wait for user selection and refine the recommendation
main = df.Orchestrator.create(orchestrator_function)
CityRecommender 编排器
param($Context)
$userChoice = $Context.Input -as [int]
if ($userChoice -eq 1) {
Set-DurableCustomStatus -CustomStatus @{ recommendedCities = @('Tokyo', 'Seattle');
recommendedSeasons = @('Spring', 'Summer')
}
}
if ($userChoice -eq 2) {
Set-DurableCustomStatus -CustomStatus @{ recommendedCities = @('Seattle', 'London');
recommendedSeasons = @('Summer')
}
}
if ($userChoice -eq 3) {
Set-DurableCustomStatus -CustomStatus @{ recommendedCities = @('Tokyo', 'London');
recommendedSeasons = @('Spring', 'Summer')
}
}
# Wait for user selection and refine the recommendation
@FunctionName("CityRecommender")
public void cityRecommender(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
int userChoice = ctx.getInput(int.class);
switch (userChoice) {
case 1:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Tokyo", "Seattle" },
new String[]{ "Spring", "Summer" }));
break;
case 2:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Seattle", "London" },
new String[]{ "Summer" }));
break;
case 3:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Tokyo", "London" },
new String[]{ "Spring", "Summer" }));
break;
}
// Wait for user selection with an external event
}
class Recommendation {
public Recommendation() { }
public Recommendation(String[] cities, String[] seasons) {
this.recommendedCities = cities;
this.recommendedSeasons = seasons;
}
public String[] recommendedCities;
public String[] recommendedSeasons;
}
using System.Threading.Tasks;
using Microsoft.DurableTask;
public class CityRecommender : TaskOrchestrator<int, object?>
{
public override Task<object?> RunAsync(TaskOrchestrationContext context, int userChoice)
{
switch (userChoice)
{
case 1:
context.SetCustomStatus(new
{
recommendedCities = new[] { "Tokyo", "Seattle" },
recommendedSeasons = new[] { "Spring", "Summer" },
});
break;
case 2:
context.SetCustomStatus(new
{
recommendedCities = new[] { "Seattle", "London" },
recommendedSeasons = new[] { "Summer" },
});
break;
case 3:
context.SetCustomStatus(new
{
recommendedCities = new[] { "Tokyo", "London" },
recommendedSeasons = new[] { "Spring", "Summer" },
});
break;
}
// Wait for user selection and refine the recommendation
return Task.FromResult<object?>(null);
}
}
from durabletask import task
def city_recommender(ctx: task.OrchestrationContext, user_choice: int):
if user_choice == 1:
ctx.set_custom_status({
"recommendedCities": ["Tokyo", "Seattle"],
"recommendedSeasons": ["Spring", "Summer"]
})
elif user_choice == 2:
ctx.set_custom_status({
"recommendedCities": ["Seattle", "London"],
"recommendedSeasons": ["Summer"]
})
elif user_choice == 3:
ctx.set_custom_status({
"recommendedCities": ["Tokyo", "London"],
"recommendedSeasons": ["Spring", "Summer"]
})
# Wait for user selection and refine the recommendation
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class CityRecommender implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
int userChoice = ctx.getInput(int.class);
switch (userChoice) {
case 1:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Tokyo", "Seattle" },
new String[]{ "Spring", "Summer" }));
break;
case 2:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Seattle", "London" },
new String[]{ "Summer" }));
break;
case 3:
ctx.setCustomStatus(new Recommendation(
new String[]{ "Tokyo", "London" },
new String[]{ "Spring", "Summer" }));
break;
}
// Wait for user selection and refine the recommendation
}
}
class Recommendation {
public Recommendation(String[] cities, String[] seasons) {
this.recommendedCities = cities;
this.recommendedSeasons = seasons;
}
public String[] recommendedCities;
public String[] recommendedSeasons;
}
import { OrchestrationContext, TOrchestrator } from "@microsoft/durabletask-js";
const cityRecommender: TOrchestrator = async function* (ctx: OrchestrationContext, userChoice: number): any {
switch (userChoice) {
case 1:
ctx.setCustomStatus({
recommendedCities: ["Tokyo", "Seattle"],
recommendedSeasons: ["Spring", "Summer"],
});
break;
case 2:
ctx.setCustomStatus({
recommendedCities: ["Seattle", "London"],
recommendedSeasons: ["Summer"],
});
break;
case 3:
ctx.setCustomStatus({
recommendedCities: ["Tokyo", "London"],
recommendedSeasons: ["Spring", "Summer"],
});
break;
}
// Wait for user selection and refine the recommendation
};
指令规范
您的编排器可以通过自定义状态向客户端提供唯一的指令。 自定义状态说明对应于编排代码中的步骤:
[FunctionName("ReserveTicket")]
public static async Task<bool> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string userId = context.GetInput<string>();
int discount = await context.CallActivityAsync<int>("CalculateDiscount", userId);
context.SetCustomStatus(new
{
discount = discount,
discountTimeout = 60,
bookingUrl = "https://www.myawesomebookingweb.com",
});
bool isBookingConfirmed = await context.WaitForExternalEvent<bool>("BookingConfirmed");
context.SetCustomStatus(isBookingConfirmed
? new {message = "Thank you for confirming your booking."}
: new {message = "The booking was not confirmed on time. Please try again."});
return isBookingConfirmed;
}
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const userId = context.df.getInput();
const discount = yield context.df.callActivity("CalculateDiscount", userId);
context.df.setCustomStatus({
discount,
discountTimeout: 60,
bookingUrl: "https://www.myawesomebookingweb.com",
});
const isBookingConfirmed = yield context.df.waitForExternalEvent("BookingConfirmed");
context.df.setCustomStatus(isBookingConfirmed
? { message: "Thank you for confirming your booking." }
: { message: "The booking was not confirmed on time. Please try again." }
);
return isBookingConfirmed;
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
userId = int(context.get_input())
discount = yield context.call_activity('CalculateDiscount', userId)
status = { 'discount' : discount,
'discountTimeout' : 60,
'bookingUrl' : "https://www.myawesomebookingweb.com",
}
context.set_custom_status(status)
is_booking_confirmed = yield context.wait_for_external_event('BookingConfirmed')
context.set_custom_status({'message': 'Thank you for confirming your booking.'} if is_booking_confirmed
else {'message': 'The booking was not confirmed on time. Please try again.'})
return is_booking_confirmed
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$userId = $Context.Input -as [int]
$discount = Invoke-DurableActivity -FunctionName 'CalculateDiscount' -Input $userId
$status = @{
discount = $discount;
discountTimeout = 60;
bookingUrl = "https://www.myawesomebookingweb.com"
}
Set-DurableCustomStatus -CustomStatus $status
$isBookingConfirmed = Invoke-DurableActivity -FunctionName 'BookingConfirmed'
if ($isBookingConfirmed) {
Set-DurableCustomStatus -CustomStatus @{message = 'Thank you for confirming your booking.'}
} else {
Set-DurableCustomStatus -CustomStatus @{message = 'The booking was not confirmed on time. Please try again.'}
}
return $isBookingConfirmed
@FunctionName("ReserveTicket")
public boolean reserveTicket(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String userID = ctx.getInput(String.class);
int discount = ctx.callActivity("CalculateDiscount", userID, int.class).await();
ctx.setCustomStatus(new DiscountInfo(discount, 60, "https://www.myawesomebookingweb.com"));
boolean isConfirmed = ctx.waitForExternalEvent("BookingConfirmed", boolean.class).await();
if (isConfirmed) {
ctx.setCustomStatus("Thank you for confirming your booking.");
} else {
ctx.setCustomStatus("There was a problem confirming your booking. Please try again.");
}
return isConfirmed;
}
class DiscountInfo {
public DiscountInfo() { }
public DiscountInfo(int discount, int discountTimeout, String bookingUrl) {
this.discount = discount;
this.discountTimeout = discountTimeout;
this.bookingUrl = bookingUrl;
}
public int discount;
public int discountTimeout;
public String bookingUrl;
}
using System.Threading.Tasks;
using Microsoft.DurableTask;
public class ReserveTicket : TaskOrchestrator<string, bool>
{
public override async Task<bool> RunAsync(TaskOrchestrationContext context, string userId)
{
int discount = await context.CallActivityAsync<int>("CalculateDiscount", userId);
context.SetCustomStatus(new
{
discount,
discountTimeout = 60,
bookingUrl = "https://www.myawesomebookingweb.com",
});
bool isBookingConfirmed = await context.WaitForExternalEvent<bool>("BookingConfirmed");
context.SetCustomStatus(isBookingConfirmed
? new { message = "Thank you for confirming your booking." }
: new { message = "The booking was not confirmed on time. Please try again." });
return isBookingConfirmed;
}
}
from durabletask import task
def calculate_discount(ctx: task.ActivityContext, user_id: str) -> int:
# Calculate discount based on user
return 10
def reserve_ticket(ctx: task.OrchestrationContext, user_id: str):
discount = yield ctx.call_activity(calculate_discount, input=user_id)
ctx.set_custom_status({
"discount": discount,
"discountTimeout": 60,
"bookingUrl": "https://www.myawesomebookingweb.com"
})
is_booking_confirmed = yield ctx.wait_for_external_event("BookingConfirmed")
if is_booking_confirmed:
ctx.set_custom_status({"message": "Thank you for confirming your booking."})
else:
ctx.set_custom_status({"message": "The booking was not confirmed on time. Please try again."})
return is_booking_confirmed
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class ReserveTicket implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String userId = ctx.getInput(String.class);
int discount = ctx.callActivity("CalculateDiscount", userId, int.class).await();
ctx.setCustomStatus(new DiscountInfo(discount, 60, "https://www.myawesomebookingweb.com"));
boolean isConfirmed = ctx.waitForExternalEvent("BookingConfirmed", boolean.class).await();
if (isConfirmed) {
ctx.setCustomStatus("Thank you for confirming your booking.");
} else {
ctx.setCustomStatus("The booking was not confirmed on time. Please try again.");
}
ctx.complete(isConfirmed);
}
}
class DiscountInfo {
public DiscountInfo(int discount, int discountTimeout, String bookingUrl) {
this.discount = discount;
this.discountTimeout = discountTimeout;
this.bookingUrl = bookingUrl;
}
public int discount;
public int discountTimeout;
public String bookingUrl;
}
import { ActivityContext, OrchestrationContext, TOrchestrator } from "@microsoft/durabletask-js";
const calculateDiscount = async (_: ActivityContext, userId: string): Promise<number> => {
// Calculate discount based on user
return 10;
};
const reserveTicket: TOrchestrator = async function* (ctx: OrchestrationContext, userId: string): any {
const discount: number = yield ctx.callActivity(calculateDiscount, userId);
ctx.setCustomStatus({
discount,
discountTimeout: 60,
bookingUrl: "https://www.myawesomebookingweb.com",
});
const isBookingConfirmed: boolean = yield ctx.waitForExternalEvent("BookingConfirmed");
ctx.setCustomStatus(isBookingConfirmed
? { message: "Thank you for confirming your booking." }
: { message: "The booking was not confirmed on time. Please try again." }
);
return isBookingConfirmed;
};
查询自定义状态
以下示例演示如何使用内置 HTTP API 查询自定义状态值。
public static async Task SetStatusTest([OrchestrationTrigger] IDurableOrchestrationContext context)
{
// ...do work...
// update the status of the orchestration with some arbitrary data
var customStatus = new { nextActions = new [] {"A", "B", "C"}, foo = 2, };
context.SetCustomStatus(customStatus);
// ...do more work...
}
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
// ...do work...
// update the status of the orchestration with some arbitrary data
const customStatus = { nextActions: [ "A", "B", "C" ], foo: 2, };
context.df.setCustomStatus(customStatus);
// ...do more work...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# ...do work...
custom_status = {'nextActions': ['A','B','C'], 'foo':2}
context.set_custom_status(custom_status)
# ...do more work...
main = df.Orchestrator.create(orchestrator_function)
param($Context)
# ...do work...
Set-DurableCustomStatus -CustomStatus @{ nextActions = @('A', 'B', 'C');
foo = 2
}
# ...do more work...
@FunctionName("MyCustomStatusOrchestrator")
public void myCustomStatusOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// ... do work ...
// update the status of the orchestration with some arbitrary data
CustomStatusPayload payload = new CustomStatusPayload();
payload.nextActions = new String[] { "A", "B", "C" };
payload.foo = 2;
ctx.setCustomStatus(payload);
// ... do more work ...
}
class CustomStatusPayload {
public String[] nextActions;
public int foo;
}
在编排运行时,外部客户端可以获取此自定义状态。
GET /runtime/webhooks/durabletask/instances/instance123
客户将看到以下响应:
{
"runtimeStatus": "Running",
"input": null,
"customStatus": { "nextActions": ["A", "B", "C"], "foo": 2 },
"output": null,
"createdTime": "2019-10-06T18:30:24Z",
"lastUpdatedTime": "2019-10-06T19:40:30Z"
}
警告
自定义状态有效负载限制为 16 KB 的 UTF-16 JSON 文本。 如果需要更大的有效负载,建议使用外部存储。
持久任务 SDK 不提供内置的 HTTP 状态终结点。 使用DurableTaskClient编排实例的元数据 API 来查询自定义状态。
using System.Threading.Tasks;
using Microsoft.DurableTask;
public class MyCustomStatusOrchestrator : TaskOrchestrator<object?, object?>
{
public override Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
context.SetCustomStatus(new { nextActions = new[] { "A", "B", "C" }, foo = 2 });
return Task.FromResult<object?>(null);
}
}
from durabletask import task
def my_custom_status_orchestrator(ctx: task.OrchestrationContext, _):
ctx.set_custom_status({"nextActions": ["A", "B", "C"], "foo": 2})
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class MyCustomStatusOrchestrator implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
CustomStatusPayload payload = new CustomStatusPayload();
payload.nextActions = new String[] { "A", "B", "C" };
payload.foo = 2;
ctx.setCustomStatus(payload);
ctx.complete(null);
}
}
class CustomStatusPayload {
public String[] nextActions;
public int foo;
}
import { OrchestrationContext, TOrchestrator } from "@microsoft/durabletask-js";
const myCustomStatusOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
// ...do work...
ctx.setCustomStatus({ nextActions: ["A", "B", "C"], foo: 2 });
// ...do more work...
};
从客户端查询自定义状态:
using Microsoft.DurableTask.Client;
OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: true);
string? customStatusJson = metadata?.SerializedCustomStatus;
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Assumes 'client' is a DurableTaskSchedulerClient instance
state = client.get_orchestration_state(instance_id, fetch_payloads=True)
custom_status_json = state.serialized_custom_status
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, true);
CustomStatusPayload payload = metadata.readCustomStatusAs(CustomStatusPayload.class);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Get the custom status of an orchestration instance
const state = await client.getOrchestrationState(instanceId, true);
const customStatusJson = state?.serializedCustomStatus;
警告
自定义状态有效负载限制为 16 KB 的 UTF-16 JSON 文本。
后续步骤