次の方法で共有

函数链式调用

函数链接是一种模式,可在其中按顺序运行函数序列。 通常将一个函数的输出传递给下一个函数的输入。 本文介绍在完成Durable Functions快速入门(C#JavaScriptTypeScriptPythonPowerShellJava)时所构建的链接序列。 请参阅 Durable Functions概述了解更多信息。

先决条件

函数链式调用是一种模式,可以按顺序运行一系列活动。 通常将一个活动的输出传递给下一个活动的输入。 本文介绍用于 .NET、JavaScript、Python 和 Java 的 Durable Task SDK 的链式序列。

Functions

本文介绍示例应用中的以下函数:

  • E1_HelloSequence:一个业务流程协调程序函数,按顺序多次调用E1_SayHello。 它存储每个输出并记录结果。
  • E1_SayHello:将“Hello”添加到字符串开头的活动 函数
  • HttpStart:一个由 HTTP 触发的 耐用客户端 函数,用于启动协调器的实例。

本文介绍示例应用中的这些组件:

  • GreetingOrchestrationgreetingOrchestratorfunction_chaining_orchestratorActivityChaining: 按顺序调用多个活动的协调程序。 它存储每个输出并记录结果。
  • 活动函数:处理输入和输出结果的活动。 每个活动对输入执行简单的转换。
  • 客户端:启动工作流协调器的实例并等待结果的客户端应用。

协调器

        [FunctionName("E1_HelloSequence")]
        public static async Task<List<string>> Run(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var outputs = new List<string>();

            outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>("E1_SayHello_DirectInput", "London"));

            // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
            return outputs;
        }

所有 C# orchestration 函数都必须具有 DurableOrchestrationContext 类型的参数,此参数存在于 Microsoft.Azure.WebJobs.Extensions.DurableTask 程序集中。 借助此上下文对象,可使用其 方法调用其他活动 函数并传递输入参数。

代码将在具有不同参数值的序列中调用三次 E1_SayHello。 每个调用的返回值都会添加到 outputs 列表,函数末尾会返回该列表。

此代码显示一个业务流程协调程序,该业务流程协调程序按顺序调用三个活动,并将每个输出传递到下一个活动:

using Microsoft.DurableTask;

[DurableTask]
public class GreetingOrchestration : TaskOrchestrator<string, string>
{
    public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
    {
        // Step 1: Say hello to the person
        string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

        // Step 2: Process the greeting
        string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

        // Step 3: Finalize the response
        string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

        return finalResponse;
    }
}

所有.NET业务流程协调程序都继承自 TaskOrchestrator<TInput, TOutput>TaskOrchestrationContext 允许你使用 CallActivityAsync 调用活动函数。 代码按顺序调用三个活动,其中每个活动都接收上一活动的输出。

活动

        [FunctionName("E1_SayHello")]
        public static string SayHello([ActivityTrigger] IDurableActivityContext context)
        {
            string name = context.GetInput<string>();
            return $"Hello {name}!";
        }

活动使用了 ActivityTrigger 属性。 使用 IDurableActivityContext 进行活动操作,例如通过 GetInput<T> 读取输入。

E1_SayHello 设置问候语字符串的格式。

不要绑定到 IDurableActivityContext,而是直接绑定到传递给活动函数的类型。 例如:

        [FunctionName("E1_SayHello_DirectInput")]
        public static string SayHelloDirectInput([ActivityTrigger] string name)
        {
            return $"Hello {name}!";
        }

Durable Task SDK 中的活动继承自 TaskActivity<TInput, TOutput>

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    private readonly ILogger<SayHelloActivity> _logger;

    public SayHelloActivity(ILogger<SayHelloActivity> logger)
    {
        _logger = logger;
    }

    public override Task<string> RunAsync(TaskActivityContext context, string name)
    {
        _logger.LogInformation("Activity SayHello called with name: {Name}", name);
        return Task.FromResult($"Hello {name}!");
    }
}

[DurableTask]
public class ProcessGreetingActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string greeting)
    {
        return Task.FromResult($"{greeting} How are you today?");
    }
}

[DurableTask]
public class FinalizeResponseActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string response)
    {
        return Task.FromResult($"{response} I hope you're doing well!");
    }
}

使用依赖项注入获取类似 ILogger服务。 将 [DurableTask] 属性添加到活动中以向工作者注册。

客户

从客户端函数启动协调器函数实例。 使用HttpStart HTTP 触发的函数来启动E1_HelloSequence的实例。

    public static class HttpStart
    {
        [FunctionName("HttpStart")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}")] HttpRequestMessage req,
            [DurableClient] IDurableClient starter,
            string functionName,
            ILogger log)
        {
            // Function input comes from the request content.
            object eventData = await req.Content.ReadAsAsync<object>();
            string instanceId = await starter.StartNewAsync(functionName, eventData);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }

若要与编排器交互,请添加 DurableClient 输入绑定。 使用客户端启动业务流程并返回包含 URL 的 HTTP 响应,以检查新业务流程的状态。

从客户端应用程序启动编排。 客户安排编排, 并可以等待完成。

using Microsoft.DurableTask.Client;

// Create the client
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();

// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(GreetingOrchestration),
    input: "World");

Console.WriteLine($"Started orchestration with ID: {instanceId}");

// Wait for the orchestration to complete
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true);

Console.WriteLine($"Orchestration completed with result: {result.ReadOutputAs<string>()}");

使用指向持久任务计划程序的连接字符串创建 DurableTaskClient。 用 ScheduleNewOrchestrationInstanceAsync 来启动编排并等待 WaitForInstanceCompletionAsync 完成。

运行示例

若要运行 E1_HelloSequence 业务流程,请将此 HTTP POST 请求发送到 HttpStart 函数。

POST http://{host}/orchestrators/E1_HelloSequence

注意

前面的 HTTP 代码片段假定示例 的host.json 文件从所有 HTTP 触发器函数 URL 中删除默认 api/ 前缀。 在示例 的host.json 文件中查找此配置。

例如,如果要在名为 myfunctionapp 的函数应用中运行示例,请将 {host} 替换为 myfunctionapp.chinacloudsites.cn

请求返回 HTTP 202(内容已简化以保持简洁):

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

编排队列并立即开始运行。 使用标头中的 Location URL 检查执行状态。

GET http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

响应显示编排状态。 由于实例快速完成,因此实例通常处于 “已完成 ”状态并返回如下所示的响应(为简洁起见剪裁):

HTTP/1.1 200 OK
Content-Length: 179
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":null,"output":["Hello Tokyo!","Hello Seattle!","Hello London!"],"createdTime":"2017-06-29T05:24:57Z","lastUpdatedTime":"2017-06-29T05:24:59Z"}

实例 runtimeStatusCompleted,包含 output 业务流程协调程序函数执行的 JSON 序列化结果。

注意

为其他触发器类型(例如queueTriggereventHubTriggertimerTrigger)实现类似的启动逻辑。

查看函数执行日志。 由于在E1_HelloSequence中描述的重播行为,函数会多次启动和完成。 但 E1_SayHello 只是运行三次,因为活动函数的执行过程不会重播。

若要运行示例,需要:

  1. 启动 Durable Task Scheduler 模拟器 (用于本地开发):

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. 启动工作器 来注册协调器和活动。

  3. 运行客户端 来计划业务流程并等待结果。

输出的客户端展示了串联编排的结果:

Started orchestration with ID: abc123
Orchestration completed with result: "Hello World! How are you today? I hope you're doing well!"

工作器日志显示每个活动按顺序运行,并将其输出传递给下一个活动。

后续步骤

这个示例演示了一个简单的函数链式编排。 接下来,实现扇出/扇入模式。

这个示例演示了一个简单的函数链式编排。 接下来,探索更多模式。

有关完整的 JavaScript SDK 示例,请参阅 Durable Task JavaScript SDK 示例