Azure Service Fabric 中的可靠并发队列简介

可靠并发队列是一种异步的、事务性的已复制队列,其特点是排队和取消排队操作的高并发性。 它旨在降低可靠队列提供的严格的 FIFO 排序要求,代之以“尽力排序”要求,从而提高吞吐量并降低延迟。

API

并发队列 可靠并发队列
void Enqueue(T item) Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(out T result) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

可靠队列比较

可靠并发队列作为可靠队列的替代推出, 其使用范围局限于不需要严格 FIFO 排序的情况,这种情况下,在尽量保证 FIFO 的同时,还需要考虑到并发性。 可靠队列以锁定方式强制实施 FIFO 排序,一次只允许一个事务排队,一个事务取消排队。 相对而言,可靠并发队列对排序的要求较松,允许任意数目的并发事务交叉进行排队和取消排队操作。 可靠并发队列的原则是“尽力排序”,但无法保证两个值的相对顺序。

在有多个并发事务需要执行排队和/或取消排队操作的情况下,可靠并发队列相对于可靠队列而言可以提高吞吐量并降低延迟。

ReliableConcurrentQueue 的一个示例用例是“消息队列”

场景。 在该方案中,一个或多个消息生成者会创建项并将其添加到队列中,同时还会有一个或多个消息使用者从队列拉取消息并对其进行处理。 多个生成者和使用者可以独立操作,使用并发事务来处理队列。

使用准则

  • 队列希望队列中的项的保留期较短。 换句话说,项呆在队列中的时间不宜过长。
  • 队列不保证严格的 FIFO 排序。
  • 队列不读取自己的写入。 项在事务中排队时,该项对于同一事务中的取消排队者来说为不可见。
  • 取消排队不是相互隔离的。 如果项 A 在事务 txnA 中取消排队,则即使 txnA 尚未提交,项 A 也不会对并发事务 txnB 可见。 如果 txnA 中止,A 会立刻变得对 txnB 可见。
  • 可以先使用 TryDequeueAsync,再中止事务,从而实现 TryPeekAsync 行为。 可以在“编程模式”部分中找到此行为的示例。
  • 计数是非事务性的。 可以通过计数来了解队列中的元素数目,但计数只代表一个时间点的情况,可靠性不强。
  • 在事务处于活动状态时,不应对取消排队项执行开销高昂的处理,以避免可能对系统性能产生影响的长时间运行事务。

代码段

让我们看看一些代码片段及其预期输出。 本部分忽略异常处理。

Instantiation

创建可靠并发队列的实例类似于任何其他可靠集合。

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

下面是使用 EnqueueAsync 的一些代码片段及其预期输出。

  • 案例 1:单个排队任务
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

假定任务已成功完成,且没有并发事务在修改队列。 用户可以预期队列包含的项采用以下顺序:

10, 20

20, 10

  • 案例 2:并行排队任务
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 无法推断队列中项的顺序。 就此代码片段来说,项的显示顺序可能是 4! 种 可能的顺序之一。 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。

DequeueAsync

下面是使用 TryDequeueAsync 的一些代码片段及预期输出。 假定已在队列中填充以下项:

10, 20, 30, 40, 50, 60

  • 案例 1:单个取消排队任务
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

假定任务已成功完成,且没有并发事务在修改队列。 由于无法推断队列中项的顺序,可能会采用任意顺序取消任意三个项的排队。 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。

  • 案例 2:并行取消排队任务
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 由于无法推断队列中项的顺序,列表 dequeue1 和 dequeue2 均会包含采用任意顺序的任意两个项。

同一项不会出现在两个列表中。 因此,如果 dequeue1 包含 10、30,则 dequeue2 就会包含 20、40。

  • 案例 3:通过中止事务排定取消排队任务顺序

中止正在取消排队的事务,项就会重新回到队列头。 项回到队列头的顺序是不确定的。 请看以下代码:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

假定项在取消排队时的顺序如下:

10, 20

中止事务后,项会采用以下顺序之一添加回队列头:

10, 20

20, 10

事务未成功提交的所有案例均是如此。

编程模式

此部分介绍一些编程模式,在使用 ReliableConcurrentQueue 时可能会用到这些模式。

按批取消排队

建议的编程模式是,通过使用者任务按批取消排队,而不是一次执行一个取消排队操作。 用户可以选择限制每个批处理之间的延迟,或者限制批处理大小。 以下代码片段显示了此编程模型。 请注意,在此示例中,处理是在提交事务后完成的,因此,如果在处理过程中发生错误,则未处理项将丢失而不会得到处理。 或者,处理可以在事务的范围内完成,但是它可能会对性能产生负面影响,并且需要处理已处理项。

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

基于通知的尽力处理

另一种有趣的编程模式是使用计数 API。 在这里,我们可以为队列实施基于通知的尽力处理。 可以使用队列计数来限制排队或取消排队任务。 请注意,与前面的示例一样,由于处理在事务外部进行,如果在处理过程中发生错误,则可能会丢失未处理的项。

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

尽力清空

考虑到数据结构的并发特性,不保证队列会被清空。 即使队列中没有正在进行的用户操作,对 TryDequeueAsync 的特定调用也可能不会返回以前排队和提交的项。 排队的项最终必定会变得对取消排队操作可见,但在没有带外通信机制的情况下,独立的使用者无法知道队列是否已达到稳定状态,即使系统已停止所有生成者且不允许新的排队操作。 因此,清空操作只能尽力而为,其执行情况如下所示。

用户应停止所有后续的生成者和使用者任务,等待正在进行的事务提交或中止,并尝试清空队列。 如果用户知道队列中预期的项数,则可以设置一个通知,通知所有项都已取消排队。

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

速览

可靠并发队列不提供 TryPeekAsync API。 用户可以先使用 TryDequeueAsync,再中止事务,从而获取速览语义。 在以下示例中,仅当项的值大于 10 时,才会处理取消排队操作。

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

必读