Partager via

Azure Service Fabric 中的可靠并发队列简介

可靠并发队列是一种异步的、事务性的已复制队列,其特点是入队和出队操作的高并发性。 它旨在通过放宽 Reliable Queue 提供的严格 FIFO 排序,以提供高吞吐量和低延迟,转而提供尽力而为的顺序。

API

并发队列 可靠并发队列
void Enqueue(T item) 任务 EnqueueAsync(ITransaction tx, T 项)
bool TryDequeue(out T result) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

可靠队列比较

可靠并发队列作为可靠队列的替代推出, 在不需要严格的 FIFO 排序的情况下,应使用它,因为保证 FIFO 需要与并发性进行权衡。 可靠队列使用锁来强制实现 FIFO 排序,一次只允许一个事务入队,一次只允许一个事务出队。 相对而言,可靠并发队列放宽了排序约束,允许任意数量的并发事务交错进行入队和出队操作。 可靠并发队列提供尽力而为的排序,但无法保证两个值的相对顺序。

在有多个并发事务执行入队和/或出队操作时,可靠并发队列相较于可靠队列可提供更高的吞吐量和更低的延迟。

ReliableConcurrentQueue 的一个示例用例是“消息队列”

场景。 在该方案中,一个或多个消息生成者会创建项并将其添加到队列中,同时还会有一个或多个消息使用者从队列拉取消息并对其进行处理。 多个生成者和使用者可以独立操作,使用并发事务来处理队列。

使用准则

  • 队列期望队列中的项的保留时间较短。 也就是说,项目不会长时间留在队列中。
  • 队列不保证严格的 FIFO 排序。
  • 队列不读取自己的写入。 在事务中将一项入队时,该项在同一事务内对出队者则是不可见的。
  • 出列不是彼此之间隔离的。 如果在事务 txnA 中将项 A 出队,即使 txnA 尚未提交,项 A 仍不会对并发事务 txnB 可见。 如果 txnA 中止,A 会立刻变得对 txnB 可见。
  • 可以通过使用 TryDequeueAsync 然后中止事务来实现 TryPeekAsync 的行为。 可以在“编程模式”部分中找到此行为的示例。
  • 计数是非事务性的。 它可用于大致了解队列中的元素数,但仅反映特定时刻的情况,不足以依靠。
  • 在事务处于活动状态时,不应对已出队的项执行耗费资源的处理,以避免长时间运行的事务可能对系统产生性能影响。

代码段

让我们看看一些代码片段及其预期输出。 本部分忽略异常处理。

实例

创建可靠并发队列的实例类似于任何其他可靠集合。

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

下面是使用 EnqueueAsync 的一些代码片段及其预期输出。

  • 案例 1:单个排队任务
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

假设任务已成功地完成,并且没有其他并发事务正在修改队列。 用户可以期望队列中的项目采用以下任一顺序:

10, 20

20, 10

  • 案例 2:并行排队任务
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 无法推断队列中项的顺序。 就这个代码片段而言,项可能以4!种不同的顺序出现。 可能的顺序。 队列尝试按原始(排队)顺序保留这些项,但由于并发操作或错误,可能会强制重新排序这些项。

DequeueAsync

下面是使用 TryDequeueAsync 的一些代码片段及预期输出。 假设队列中已经包含了以下项目:

10, 20, 30, 40, 50, 60

  • 案例 1:单个出队任务
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

假定任务已成功完成,且没有并发事务对队列的修改。 由于无法推断队列中项的顺序,任意三个项可以以任意顺序出列。 队列尝试按原始(排队)顺序保留这些项,但由于并发操作或错误,可能会强制重新排序这些项。

  • 案例 2:并行出队任务
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 由于无法推断队列中项的顺序,列表 dequeue1 和 dequeue2 均会包含采用任意顺序的任意两个项。

这两个列表中不会显示同一项。 因此,如果 dequeue1 包含 10、30,则 dequeue2 就会包含 20、40。

  • 案例 3:事务中止下的出队顺序

中止包含正在进行的出队操作的事务,会使项目重新回到队列的头部。 无法保证将项目放回队列首部的顺序。 请看以下代码:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

假设元素出队的顺序如下:

10, 20

终止交易后,这些项目将以以下任一顺序被重新添加到队列头部:

10, 20

20, 10

这同样适用于所有未成功提交的事务情况。

编程模式

此部分介绍一些编程模式,在使用 ReliableConcurrentQueue 时可能会用到这些模式。

批量出队

建议的编程模式是,让消费者任务批量处理出队操作,而不是一次执行一个出队操作。 用户可以选择限制每个批处理之间的延迟,或者限制批处理大小。 以下代码片段显示了此编程模型。 请注意,在此示例中,处理是在提交事务后完成的,因此,如果在处理过程中发生错误,则未处理项将丢失而不会得到处理。 或者,处理可以在事务的范围内完成,但是它可能会对性能产生负面影响,并且需要处理已处理项。

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

基于通知的尽力处理

另一种有趣的编程模式是使用计数 API。 在这里,我们可以为队列实施基于通知的尽力处理。 可以使用队列计数来限制排队或取消排队任务。 请注意,与前面的示例一样,由于处理在事务外部进行,如果在处理过程中发生错误,则可能会丢失未处理的项。

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

尽力清空

由于数据结构的并发性质,无法保证队列的清空。 有可能即使队列中没有用户操作正在运行,对 TryDequeueAsync 的特定调用也可能不会返回之前已入队和提交的元素。 保证排队项 最终 可以被出队,但是如果没有外部通信机制,独立使用者也不能知道队列已经进入稳定状态,即使所有生成者已经停止,并且不允许新的入队操作。 因此,清空操作是尽力操作,其实现情况如下所示。

用户应停止所有生产者和消费者任务,等待未完成的事务提交或中止,然后尝试清空队列。 如果用户知道队列中预期的项数,则可以设置一个通知,通知所有项都已取消排队。

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

速览

ReliableConcurrentQueue 不提供 TryPeekAsync API。 用户可以使用TryDequeueAsync获取预览信息,然后中止事务。 在此示例中,仅当项的值大于 10 时,才会处理出队操作。

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

必读