Azure Service Fabric 中的可靠并发队列简介Introduction to ReliableConcurrentQueue in Azure Service Fabric

可靠并发队列是一种异步的、事务性的已复制队列,其特点是排队和取消排队操作的高并发性。Reliable Concurrent Queue is an asynchronous, transactional, and replicated queue which features high concurrency for enqueue and dequeue operations. 它旨在降低可靠队列提供的严格的 FIFO 排序要求,代之以“尽力排序”要求,从而提高吞吐量并降低延迟。It is designed to deliver high throughput and low latency by relaxing the strict FIFO ordering provided by Reliable Queue and instead provides a best-effort ordering.

APIAPIs

并发队列Concurrent Queue 可靠并发队列Reliable Concurrent Queue
void Enqueue(T item)void Enqueue(T item) Task EnqueueAsync(ITransaction tx, T item)Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(out T result)bool TryDequeue(out T result) Task< ConditionalValue < T > > TryDequeueAsync(ITransaction tx)Task< ConditionalValue < T > > TryDequeueAsync(ITransaction tx)
int Count()int Count() long Count()long Count()

可靠队列比较Comparison with Reliable Queue

可靠并发队列作为可靠队列的替代推出,Reliable Concurrent Queue is offered as an alternative to Reliable Queue. 其使用范围局限于不需要严格 FIFO 排序的情况,这种情况下,在尽量保证 FIFO 的同时,还需要考虑到并发性。It should be used in cases where strict FIFO ordering is not required, as guaranteeing FIFO requires a tradeoff with concurrency. 可靠队列以锁定方式强制实施 FIFO 排序,一次只允许一个事务排队,一个事务取消排队。Reliable Queue uses locks to enforce FIFO ordering, with at most one transaction allowed to enqueue and at most one transaction allowed to dequeue at a time. 相对而言,可靠并发队列对排序的要求较松,允许任意数目的并发事务交叉进行排队和取消排队操作。In comparison, Reliable Concurrent Queue relaxes the ordering constraint and allows any number concurrent transactions to interleave their enqueue and dequeue operations. 可靠并发队列的原则是“尽力排序”,但无法保证两个值的相对顺序。Best-effort ordering is provided, however the relative ordering of two values in a Reliable Concurrent Queue can never be guaranteed.

在有多个并发事务需要执行排队和/或取消排队操作的情况下,可靠并发队列相对于可靠队列而言可以提高吞吐量并降低延迟。Reliable Concurrent Queue provides higher throughput and lower latency than Reliable Queue whenever there are multiple concurrent transactions performing enqueues and/or dequeues.

可靠并发队列的一个使用示例是消息队列方案。A sample use case for the ReliableConcurrentQueue is the Message Queue scenario. 在该方案中,一个或多个消息生成者会创建项并将其添加到队列中,同时还会有一个或多个消息使用者从队列拉取消息并对其进行处理。In this scenario, one or more message producers create and add items to the queue, and one or more message consumers pull messages from the queue and process them. 多个生成者和使用者可以独立操作,使用并发事务来处理队列。Multiple producers and consumers can work independently, using concurrent transactions in order to process the queue.

使用指南Usage Guidelines

  • 队列希望队列中的项的保留期较短。The queue expects that the items in the queue have a low retention period. 换句话说,项呆在队列中的时间不宜过长。That is, the items would not stay in the queue for a long time.
  • 队列不保证严格的 FIFO 排序。The queue does not guarantee strict FIFO ordering.
  • 队列不读取自己的写入。The queue does not read its own writes. 项在事务中排队时,该项对于同一事务中的取消排队者来说为不可见。If an item is enqueued within a transaction, it will not be visible to a dequeuer within the same transaction.
  • 取消排队不是相互隔离的。Dequeues are not isolated from each other. 如果项 A 在事务 txnA 中取消排队,则即使 txnA 尚未提交,项 A 也不会对并发事务 txnB 可见。 If item A is dequeued in transaction txnA, even though txnA is not committed, item A would not be visible to a concurrent transaction txnB. 如果 txnA 中止,A 会立刻变得对 txnB 可见。 If txnA aborts, A will become visible to txnB immediately.
  • 可以先使用 TryDequeueAsync,再中止事务,从而实现 TryPeekAsync 行为。TryPeekAsync behavior can be implemented by using a TryDequeueAsync and then aborting the transaction. 可以在“编程模式”部分中找到此行为的示例。An example of this behavior can be found in the Programming Patterns section.
  • 计数是非事务性的。Count is non-transactional. 可以通过计数来了解队列中的元素数目,但计数只代表一个时间点的情况,可靠性不强。It can be used to get an idea of the number of elements in the queue, but represents a point-in-time and cannot be relied upon.
  • 在事务处于活动状态时,不应对取消排队项执行开销高昂的处理,以避免可能对系统性能产生影响的长时间运行事务。Expensive processing on the dequeued items should not be performed while the transaction is active, to avoid long-running transactions that may have a performance impact on the system.

代码片段Code Snippets

让我们看看一些代码片段及其预期输出。Let us look at a few code snippets and their expected outputs. 本部分忽略异常处理。Exception handling is ignored in this section.

InstantiationInstantiation

创建可靠并发队列的实例类似于任何其他可靠集合。Creating an instance of a Reliable Concurrent Queue is similar to any other Reliable Collection.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsyncEnqueueAsync

下面是使用 EnqueueAsync 的一些代码片段及其预期输出。Here are a few code snippets for using EnqueueAsync followed by their expected outputs.

  • 案例 1:单个排队任务 Case 1: Single Enqueue Task

    using (var txn = this.StateManager.CreateTransaction())
    {
        await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
        await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
    
        await txn.CommitAsync();
    }
    

    假定任务已成功完成,且没有并发事务在修改队列。Assume that the task completed successfully, and that there were no concurrent transactions modifying the queue. 用户可以预期队列包含的项采用以下顺序:The user can expect the queue to contain the items in any of the following orders:

    10, 2010, 20

    20, 1020, 10

  • 案例 2:并行排队任务 Case 2: Parallel Enqueue Task

    // Parallel Task 1
    using (var txn = this.StateManager.CreateTransaction())
    {
        await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
        await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
    
        await txn.CommitAsync();
    }
    
    // Parallel Task 2
    using (var txn = this.StateManager.CreateTransaction())
    {
        await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
        await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
    
        await txn.CommitAsync();
    }
    

    假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。Assume that the tasks completed successfully, that the tasks ran in parallel, and that there were no other concurrent transactions modifying the queue. 无法推断队列中项的顺序。No inference can be made about the order of items in the queue. 就此代码片段来说,项的显示顺序可能是 4! 种For this code snippet, the items may appear in any of the 4! 可能的顺序之一。possible orderings. 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。The queue will attempt to keep the items in the original (enqueued) order, but may be forced to reorder them due to concurrent operations or faults.

DequeueAsyncDequeueAsync

下面是使用 TryDequeueAsync 的一些代码片段及预期输出。Here are a few code snippets for using TryDequeueAsync followed by the expected outputs. 假定已在队列中填充以下项:Assume that the queue is already populated with the following items in the queue:

10, 20, 30, 40, 50, 6010, 20, 30, 40, 50, 60

  • 案例 1:单个取消排队任务 Case 1: Single Dequeue Task

    using (var txn = this.StateManager.CreateTransaction())
    {
        await this.Queue.TryDequeueAsync(txn, cancellationToken);
        await this.Queue.TryDequeueAsync(txn, cancellationToken);
        await this.Queue.TryDequeueAsync(txn, cancellationToken);
    
        await txn.CommitAsync();
    }
    

    假定任务已成功完成,且没有并发事务在修改队列。Assume that the task completed successfully, and that there were no concurrent transactions modifying the queue. 由于无法推断队列中项的顺序,可能会采用任意顺序取消任意三个项的排队。Since no inference can be made about the order of the items in the queue, any three of the items may be dequeued, in any order. 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。The queue will attempt to keep the items in the original (enqueued) order, but may be forced to reorder them due to concurrent operations or faults.

  • 案例 2:并行取消排队任务 Case 2: Parallel Dequeue Task

    // Parallel Task 1
    List<int> dequeue1;
    using (var txn = this.StateManager.CreateTransaction())
    {
        dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
        dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    
        await txn.CommitAsync();
    }
    
    // Parallel Task 2
    List<int> dequeue2;
    using (var txn = this.StateManager.CreateTransaction())
    {
        dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
        dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    
        await txn.CommitAsync();
    }
    

    假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。Assume that the tasks completed successfully, that the tasks ran in parallel, and that there were no other concurrent transactions modifying the queue. 由于无法推断队列中项的顺序,列表 dequeue1 和 dequeue2 均会包含采用任意顺序的任意两个项。 Since no inference can be made about the order of the items in the queue, the lists dequeue1 and dequeue2 will each contain any two items, in any order.

    同一项不会出现在两个列表中。 The same item will not appear in both lists. 因此,如果 dequeue1 包含 10、30,则 dequeue2 就会包含 20、40。 Hence, if dequeue1 has 10, 30, then dequeue2 would have 20, 40.

  • 案例 3:通过中止事务排定取消排队任务顺序 Case 3: Dequeue Ordering With Transaction Abort

    中止正在取消排队的事务,项就会重新回到队列头。Aborting a transaction with in-flight dequeues puts the items back on the head of the queue. 项回到队列头的顺序是不确定的。The order in which the items are put back on the head of the queue is not guaranteed. 请看以下代码:Let us look at the following code:

    using (var txn = this.StateManager.CreateTransaction())
    {
        await this.Queue.TryDequeueAsync(txn, cancellationToken);
        await this.Queue.TryDequeueAsync(txn, cancellationToken);
    
        // Abort the transaction
        await txn.AbortAsync();
    }
    

    假定项在取消排队时的顺序如下:Assume that the items were dequeued in the following order:

    10, 2010, 20

    中止事务后,项会采用以下顺序之一添加回队列头:When we abort the transaction, the items would be added back to the head of the queue in any of the following orders:

    10, 2010, 20

    20, 1020, 10

    事务未成功提交的所有案例均是如此。 The same is true for all cases where the transaction was not successfully Committed.

编程模式Programming Patterns

此部分介绍一些编程模式,在使用 ReliableConcurrentQueue 时可能会用到这些模式。In this section, let us look at a few programming patterns that might be helpful in using ReliableConcurrentQueue.

按批取消排队Batch Dequeues

建议的编程模式是,通过使用者任务按批取消排队,而不是一次执行一个取消排队操作。A recommended programming pattern is for the consumer task to batch its dequeues instead of performing one dequeue at a time. 用户可以选择限制每个批处理之间的延迟,或者限制批处理大小。The user can choose to throttle delays between every batch or the batch size. 以下代码片段显示了此编程模型。The following code snippet shows this programming model. 请注意,在此示例中,处理是在提交事务后完成的,因此,如果在处理过程中发生错误,则未处理项将丢失而不会得到处理。Be aware, in this example, the processing is done after the transaction is committed, so if a fault were to occur while processing, the unprocessed items will be lost without having been processed. 或者,处理可以在事务的范围内完成,但是它可能会对性能产生负面影响,并且需要处理已处理项。Alternatively, the processing can be done within the transaction's scope, however it may have a negative impact on performance and requires handling of the items already processed.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

基于通知的尽力处理Best-Effort Notification-Based Processing

另一种有趣的编程模式是使用计数 API。Another interesting programming pattern uses the Count API. 在这里,我们可以为队列实施基于通知的尽力处理。Here, we can implement best-effort notification-based processing for the queue. 可以使用队列计数来限制排队或取消排队任务。The queue Count can be used to throttle an enqueue or a dequeue task. 请注意,与前面的示例一样,由于处理在事务外部进行,如果在处理过程中发生错误,则可能会丢失未处理的项。Note that as in the previous example, since the processing occurs outside the transaction, unprocessed items may be lost if a fault occurs during processing.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

尽力清空Best-Effort Drain

考虑到数据结构的并发特性,不保证队列会被清空。A drain of the queue cannot be guaranteed due to the concurrent nature of the data structure. 即使队列中没有正在进行的用户操作,对 TryDequeueAsync 的特定调用也可能不会返回以前排队和提交的项。It is possible that, even if no user operations on the queue are in-flight, a particular call to TryDequeueAsync may not return an item that was previously enqueued and committed. 排队的项最终必定会变得对取消排队操作可见, 但在没有带外通信机制的情况下,独立的使用者无法知道队列是否已达到稳定状态,即使系统已停止所有生成者且不允许新的排队操作。The enqueued item is guaranteed to eventually become visible to dequeue, however without an out-of-band communication mechanism, an independent consumer cannot know that the queue has reached a steady-state even if all producers have been stopped and no new enqueue operations are allowed. 因此,清空操作只能尽力而为,其执行情况如下所示。Thus, the drain operation is best-effort as implemented below.

用户应停止所有后续的生成者和使用者任务,等待正在进行的事务提交或中止,并尝试清空队列。The user should stop all further producer and consumer tasks, and wait for any in-flight transactions to commit or abort, before attempting to drain the queue. 如果用户知道队列中预期的项数,则可以设置一个通知,通知所有项都已取消排队。If the user knows the expected number of items in the queue, they can set up a notification that signals that all items have been dequeued.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

速览Peek

可靠并发队列不提供 TryPeekAsync API。ReliableConcurrentQueue does not provide the TryPeekAsync api. 用户可以先使用 TryDequeueAsync ,再中止事务,从而获取速览语义。Users can get the peek semantic by using a TryDequeueAsync and then aborting the transaction. 在以下示例中,仅当项的值大于 10 时,才会处理取消排队操作。 In this example, dequeues are processed only if the item's value is greater than 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

必读Must Read