为机器人实现自定义存储
适用于:SDK v4
机器人的交互划分为三个方面:与 Azure AI 机器人服务的活动交换,使用内存存储加载和保存机器人和对话状态,以及与后端服务的集成。
本文介绍如何扩展 Azure AI 机器人服务与机器人的内存状态和存储之间的语义。
注意
Bot Framework JavaScript、C# 和 Python SDK 将继续受支持,但 Java SDK 即将停用,最终长期支持将于 2023 年 11 月结束。
使用 Java SDK 构建的现有机器人将继续正常运行。
要生成新的机器人,请考虑使用 Microsoft Copilot Studio 并阅读选择正确的助理解决方案。
有关详细信息,请参阅机器人构建的未来。
先决条件
- 了解 Microsoft Bot Framework 的基础知识、使用活动处理程序的事件驱动对话以及管理状态。
- C#、Python 或 Java 中横向扩展示例的副本。
本文重点介绍示例的 C# 版本。
背景
Bot Framework SDK 包括机器人状态和内存存储的默认实现。 此实现适用于需要将这些片段与少量的初始化代码一起使用的应用程序,正如许多示例中所演示的那样。
SDK 是一个框架,而不是具有固定行为的应用程序。 换句话说,框架中许多机制的实现是默认实现,而不是唯一可能的实现。 该框架不会规定与 Azure AI 机器人服务的活动交换,以及加载和保存任何机器人状态这两者之间的关系。
本文介绍在不太适用于应用程序时,修改默认状态和存储实现的语义的一种方法。 横向扩展示例提供与默认设置语义不同的状态和存储的备用实现。 此备用解决方案在框架中同样良好适配。 根据你的应用场景,此备用解决方案可能更适合你正在开发的应用程序。
默认适配器和存储提供程序的行为
使用默认实现,接收活动时,机器人加载对应于此对话的状态。 然后,其使用此状态和入站活动运行对话逻辑。 在执行对话的过程中,将会创建并立即发送一个或多个出站活动。 对话处理完成后,机器人将保存更新的状态,并覆盖旧状态。
但是,此行为可能会出现一些错误。
如果保存操作由于某种原因而失败,则状态与用户在频道上看到的内容隐式地失去同步。 用户已经看到机器人的响应,并认为状态已经向前移动,但并非如此。 此错误可能比状态更新成功但用户未收到响应消息的情形更糟。
此类状态错误可能对对话设计产生影响。 例如,对话可能需要与用户进行额外的,或者冗余的确认交换。
如果实现在多个节点之间进行“横向扩展”部署,则状态可能会意外被覆盖。 此错误可能会令人困惑,因为对话可能已将活动发送到包含确认消息的通道。
考虑一个披萨订购机器人,机器人会询问用户的配料选择,而用户会发送两条快速消息:一条是添加蘑菇,一条是添加奶酪。 在横向扩展方案中,机器人的多个实例可能处于活动状态,并且两条用户消息可由不同的计算机上的两个单独的实例处理。 这种冲突称为“争用条件”,其中一台计算机可能会覆盖另一台计算机写入的状态。 但是,由于响应已发送,用户已收到订单添加了蘑菇和奶酪的确认。 遗憾的是,送达的披萨仅包含蘑菇或奶酪,而不是包含两者。
乐观锁定
横向扩展示例在状态周围引入了一些锁定。 此示例实现“乐观锁定”,让每个实例都像自己是唯一运行实例一样运行,然后检查任何并发冲突。 此锁定听起来可能很复杂,但有已知解决方案存在,并且可以使用 Bot Framework 中的云存储技术和正确的扩展点。
示例使用基于实体标记标头 (ETag) 的标准 HTTP 机制。 了解此机制对于理解以下代码至关重要。 下图演示了序列。
该示意图有对某个资源执行更新的两个客户端。
当客户端发出 GET 请求并从服务器返回资源时,服务器纳入一个 ETag 标头。
ETag 标头是代表资源状态的不透明值。 如果资源已更改,服务器会更新资源的 ETag。
当客户端想要保留状态更改时,它会向服务器发出 POST 请求,并在
If-Match
前置条件标头中包含 ETag 值。如果请求的 ETag 值与服务器的值不匹配,则前置条件检查失败并显示
412
(前置条件失败)响应。此失败表示服务器上的当前值不再与客户端处理的原始值匹配。
如果客户端收到前置条件失败响应,客户端通常会获取资源的新值,应用所需的更新,并尝试再次发布资源更新。
如果没有其他客户端更新资源,则第二个 POST 请求会成功。 否则,客户端可以重试。
此过程之所以称为“乐观”,是因为获得了资源的客户端可继续执行处理,而资源本身并未“锁定”,因为其他客户端可以不受限制地访问它。 不管资源应处于哪种状态,客户端之间的任何争用只有在完成处理之后才能确定。 在分布式系统中,此策略通常比相对的“悲观”方法更佳。
如上所述的乐观锁定机制假定可以安全地重试程序逻辑。 理想的情况是这些服务请求是“幂等的”。 在计算机科学中,幂等操作是指使用相同输入参数调用多次不会造成额外影响的操作。 实现 GET、PUT 和 DELETE 请求的纯 HTTP REST 服务通常是幂等的。 如果服务请求不会产生额外的效果,则作为重试策略的一部分,可以安全地重新执行请求。
横向扩展示例和本文的其余部分假定机器人使用的后端服务都是幂等的 HTTP REST 服务。
缓冲出站活动
发送活动不是幂等操作。 活动通常是将信息中继给用户的消息,两次或多次重复相同的消息可能会令人困惑或产生误导。
乐观锁定意味着机器人逻辑可能需要多次重新运行。 为了避免多次发送任何给定活动,请在向用户发送活动之前等待状态更新操作成功。 机器人逻辑应如下图所示。
在对话执行中生成重试循环后,在保存操作的前置条件失败时,会出现以下行为。
应用此机制后,先前示例中的披萨机器人永远不会发送在订单中添加披萨配料的确认误报。 即使机器人跨多台计算机部署,乐观锁定方案也会有效地将状态更新序列化。 在披萨机器人中,添加物料的确认现在甚至可以准确反映完整状态。 例如,如果用户快速键入“奶酪”,然后键入“蘑菇”,并且这些消息由机器人的两个不同的实例处理,则后一个完成的实例可以包括“包含奶酪和蘑菇的披萨”作为响应的一部分。
这个新的自定义存储解决方案有三点是 SDK 中的默认实现无法做到的:
- 它使用 ETag 来检测争用。
- 检测到 ETag 失败时,它会重试处理。
- 它等待发送出站活动,直到它已成功保存状态。
本文的余下内容将介绍这三个部分的实现。
实现 ETag 支持
首先,为包含 ETag 支持的新存储定义接口。 该接口有助于在 ASP.NET 中使用依赖项注入机制。 从接口开始,你可以为单元测试和生产环境分别实现不同的版本。 例如,单元测试版本可能在内存中运行,不需要网络连接。
接口由 load 和 save 方法组成。 这两种方法都将使用“键”参数来标识要从存储加载或保存到存储的状态。
- “加载”将返回状态值和关联的 ETag。
- “保存”将具有状态值和关联的 ETag 的参数,并返回一个布尔值,该值指示操作是否成功。 返回值不会用作常规错误指示器,而是作为前置条件失败的特定指示器。 检查返回代码属于重试循环逻辑的一部分。
若要使存储实现广泛适用,请避免对存储实现提出序列化要求。
但是,许多新型存储服务支持 JSON 作为 content-type。
在 C# 中,可以使用 JObject
类型来表示 JSON 对象。
在 JavaScript 或 TypeScript 中,JSON 是一个常规本机对象。
以下是自定义接口的定义。
IStore.cs
public interface IStore
{
Task<(JObject content, string etag)> LoadAsync(string key);
Task<bool> SaveAsync(string key, JObject content, string etag);
}
下面是 Azure Blob 存储的实现。
BlobStore.cs
public class BlobStore : IStore
{
private readonly CloudBlobContainer _container;
public BlobStore(string accountName, string accountKey, string containerName)
{
if (string.IsNullOrWhiteSpace(accountName))
{
throw new ArgumentException(nameof(accountName));
}
if (string.IsNullOrWhiteSpace(accountKey))
{
throw new ArgumentException(nameof(accountKey));
}
if (string.IsNullOrWhiteSpace(containerName))
{
throw new ArgumentException(nameof(containerName));
}
var storageCredentials = new StorageCredentials(accountName, accountKey);
var cloudStorageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
var client = cloudStorageAccount.CreateCloudBlobClient();
_container = client.GetContainerReference(containerName);
}
public async Task<(JObject content, string etag)> LoadAsync(string key)
{
if (string.IsNullOrWhiteSpace(key))
{
throw new ArgumentException(nameof(key));
}
var blob = _container.GetBlockBlobReference(key);
try
{
var content = await blob.DownloadTextAsync();
var obj = JObject.Parse(content);
var etag = blob.Properties.ETag;
return (obj, etag);
}
catch (StorageException e)
when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
{
return (new JObject(), null);
}
}
public async Task<bool> SaveAsync(string key, JObject obj, string etag)
{
if (string.IsNullOrWhiteSpace(key))
{
throw new ArgumentException(nameof(key));
}
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
var blob = _container.GetBlockBlobReference(key);
blob.Properties.ContentType = "application/json";
var content = obj.ToString();
if (etag != null)
{
try
{
await blob.UploadTextAsync(content, Encoding.UTF8, new AccessCondition { IfMatchETag = etag }, new BlobRequestOptions(), new OperationContext());
}
catch (StorageException e)
when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
return false;
}
}
else
{
await blob.UploadTextAsync(content);
}
return true;
}
}
Azure Blob 存储完成大部分工作。 每个方法会检查特定异常以满足调用代码的预期。
- 方法
LoadAsync
在响应带有“未找到”状态代码的存储异常时,返回一个 null 值。 - 方法
SaveAsync
在响应带有“前置条件失败”代码的存储异常时,返回false
。
实现重试循环
重试循环的设计实现了序列图中显示的行为。
接收活动时,将为对话状态创建一个键。
活动与对话状态之间的关系与默认实现的自定义存储相同。 因此,可以采用与默认状态实现相同的方式构造键。
尝试加载对话状态。
运行机器人的对话并捕获要发送的出站活动。
尝试保存对话状态。
成功后,发送出站活动并退出。
失败时,从加载对话状态步骤重复此过程。
对话状态的新负载获取新的和当前的 ETag 和对话状态。 重新运行对话,保存状态步骤有机会成功。
下面是消息活动处理程序的实现。
ScaleoutBot.cs
protected override async Task OnMessageActivityAsync(ITurnContext<IMessageActivity> turnContext, CancellationToken cancellationToken)
{
// Create the storage key for this conversation.
var key = $"{turnContext.Activity.ChannelId}/conversations/{turnContext.Activity.Conversation?.Id}";
// The execution sits in a loop because there might be a retry if the save operation fails.
while (true)
{
// Load any existing state associated with this key
var (oldState, etag) = await _store.LoadAsync(key);
// Run the dialog system with the old state and inbound activity, the result is a new state and outbound activities.
var (activities, newState) = await DialogHost.RunAsync(_dialog, turnContext.Activity, oldState, cancellationToken);
// Save the updated state associated with this key.
var success = await _store.SaveAsync(key, newState, etag);
// Following a successful save, send any outbound Activities, otherwise retry everything.
if (success)
{
if (activities.Any())
{
// This is an actual send on the TurnContext we were given and so will actual do a send this time.
await turnContext.SendActivitiesAsync(activities, cancellationToken);
}
break;
}
}
}
注意
该示例以函数调用的方式实现对话执行。 一种更复杂的方法是定义接口并使用依赖项注入。 但是,对于此示例,静态函数强调此乐观锁定方法的函数性质。 一般情况下,以函数方式实现代码的关键部分时,可以提高它在网络上成功运作的机会。
实现出站活动缓冲区
下一个要求是缓冲出站活动,直到成功保存操作发生后,这需要自定义适配器实现。
自定义 SendActivitiesAsync
方法不应将活动发送到使用,而是将活动添加到列表中。
对话代码将不需要修改。
- 在此特定方案中,不支持“更新活动”和“删除活动”操作,并且关联的方法会引发“不实现”异常。
- 某些通道使用发送活动操作的返回值,允许机器人修改或删除之前发送的信息,例如,禁用通道中显示的卡片上的按钮。 需要状态时,这些消息交换可能特别复杂,这不属于本文的讨论范畴。
- 对话会创建和使用此自定义适配器,以便可以缓冲活动。
- 机器人的轮次处理程序将使用更标准的
AdapterWithErrorHandler
向用户发送活动。
下面是自定义适配器的实现。
DialogHostAdapter.cs
public class DialogHostAdapter : BotAdapter
{
private List<Activity> _response = new List<Activity>();
public IEnumerable<Activity> Activities => _response;
public override Task<ResourceResponse[]> SendActivitiesAsync(ITurnContext turnContext, Activity[] activities, CancellationToken cancellationToken)
{
foreach (var activity in activities)
{
_response.Add(activity);
}
return Task.FromResult(new ResourceResponse[0]);
}
#region Not Implemented
public override Task DeleteActivityAsync(ITurnContext turnContext, ConversationReference reference, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public override Task<ResourceResponse> UpdateActivityAsync(ITurnContext turnContext, Activity activity, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
#endregion
}
在机器人中使用自定义存储
最后一步是将这些自定义类和方法与现有框架类和方法一起使用。
- 主重试循环将成为机器人
ActivityHandler.OnMessageActivityAsync
方法的一部分,并通过依赖项注入包括自定义存储。 - 对话托管代码将添加到公开静态
RunAsync
方法的DialogHost
类中。 对话主机:- 提取入站活动和旧状态,然后返回最终的活动和新状态。
- 创建自定义适配器,否则以与 SDK 相同的方式运行对话。
- 创建自定义状态属性访问器,该填充码将对话状态传递到对话系统。 访问器使用引用语义将访问器句柄传递至对话系统。
提示
JSON 序列化被内联添加到托管代码中,使其在可插入存储层之外,以便不同的实现可以以不同的方式序列化。
下面是对话主机的实现。
DialogHost.cs
public static class DialogHost
{
// The serializer to use. Moving the serialization to this layer will make the storage layer more pluggable.
private static readonly JsonSerializer StateJsonSerializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.All };
/// <summary>
/// A function to run a dialog while buffering the outbound Activities.
/// </summary>
/// <param name="dialog">THe dialog to run.</param>
/// <param name="activity">The inbound Activity to run it with.</param>
/// <param name="oldState">Th eexisting or old state.</param>
/// <returns>An array of Activities 'sent' from the dialog as it executed. And the updated or new state.</returns>
public static async Task<(Activity[], JObject)> RunAsync(Dialog dialog, IMessageActivity activity, JObject oldState, CancellationToken cancellationToken)
{
// A custom adapter and corresponding TurnContext that buffers any messages sent.
var adapter = new DialogHostAdapter();
var turnContext = new TurnContext(adapter, (Activity)activity);
// Run the dialog using this TurnContext with the existing state.
var newState = await RunTurnAsync(dialog, turnContext, oldState, cancellationToken);
// The result is a set of activities to send and a replacement state.
return (adapter.Activities.ToArray(), newState);
}
/// <summary>
/// Execute the turn of the bot. The functionality here closely resembles that which is found in the
/// IBot.OnTurnAsync method in an implementation that is using the regular BotFrameworkAdapter.
/// Also here in this example the focus is explicitly on Dialogs but the pattern could be adapted
/// to other conversation modeling abstractions.
/// </summary>
/// <param name="dialog">The dialog to be run.</param>
/// <param name="turnContext">The ITurnContext instance to use. Note this is not the one passed into the IBot OnTurnAsync.</param>
/// <param name="state">The existing or old state of the dialog.</param>
/// <returns>The updated or new state of the dialog.</returns>
private static async Task<JObject> RunTurnAsync(Dialog dialog, ITurnContext turnContext, JObject state, CancellationToken cancellationToken)
{
// If we have some state, deserialize it. (This mimics the shape produced by BotState.cs.)
var dialogStateProperty = state?[nameof(DialogState)];
var dialogState = dialogStateProperty?.ToObject<DialogState>(StateJsonSerializer);
// A custom accessor is used to pass a handle on the state to the dialog system.
var accessor = new RefAccessor<DialogState>(dialogState);
// Run the dialog.
await dialog.RunAsync(turnContext, accessor, cancellationToken);
// Serialize the result (available as Value on the accessor), and put its value back into a new JObject.
return new JObject { { nameof(DialogState), JObject.FromObject(accessor.Value, StateJsonSerializer) } };
}
}
最后,下面是自定义状态属性访问器的实现。
RefAccessor.cs
public class RefAccessor<T> : IStatePropertyAccessor<T>
where T : class
{
public RefAccessor(T value)
{
Value = value;
}
public T Value { get; private set; }
public string Name => nameof(T);
public Task<T> GetAsync(ITurnContext turnContext, Func<T> defaultValueFactory = null, CancellationToken cancellationToken = default(CancellationToken))
{
if (Value == null)
{
if (defaultValueFactory == null)
{
throw new KeyNotFoundException();
}
Value = defaultValueFactory();
}
return Task.FromResult(Value);
}
#region Not Implemented
public Task DeleteAsync(ITurnContext turnContext, CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}
public Task SetAsync(ITurnContext turnContext, T value, CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}
#endregion
}