Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
可靠并发队列是一种异步的、事务性的已复制队列,其特点是入队和出队操作的高并发性。 它旨在通过放宽 Reliable Queue 提供的严格 FIFO 排序,以提供高吞吐量和低延迟,转而提供尽力而为的顺序。
API
| 并发队列 | 可靠并发队列 |
|---|---|
| void Enqueue(T item) | 任务 EnqueueAsync(ITransaction tx, T 项) |
| bool TryDequeue(out T result) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
| int Count() | long Count() |
与可靠队列比较
可靠并发队列作为可靠队列的替代推出, 在不需要严格的 FIFO 排序的情况下,应使用它,因为保证 FIFO 需要与并发性进行权衡。 可靠队列使用锁来强制实现 FIFO 排序,一次只允许一个事务入队,一次只允许一个事务出队。 相对而言,可靠并发队列放宽了排序约束,允许任意数量的并发事务交错进行入队和出队操作。 可靠并发队列提供尽力而为的排序,但无法保证两个值的相对顺序。
在有多个并发事务执行入队和/或出队操作时,可靠并发队列相较于可靠队列可提供更高的吞吐量和更低的延迟。
ReliableConcurrentQueue 的一个示例用例是“消息队列”
场景。 在该方案中,一个或多个消息生成者会创建项并将其添加到队列中,同时还会有一个或多个消息使用者从队列拉取消息并对其进行处理。 多个生成者和使用者可以独立操作,使用并发事务来处理队列。
使用准则
- 队列期望队列中的项的保留时间较短。 也就是说,项目不会长时间留在队列中。
- 队列不保证严格的 FIFO 排序。
- 队列不读取自己的写入。 在事务中将一项入队时,该项在同一事务内对出队者则是不可见的。
- 出列不是彼此之间隔离的。 如果在事务 txnA 中将项 A 出队,即使 txnA 尚未提交,项 A 仍不会对并发事务 txnB 可见。 如果 txnA 中止,A 会立刻变得对 txnB 可见。
- 可以通过使用 TryDequeueAsync 然后中止事务来实现 TryPeekAsync 的行为。 可以在“编程模式”部分中找到此行为的示例。
- 计数是非事务性的。 它可用于大致了解队列中的元素数,但仅反映特定时刻的情况,不足以依靠。
- 在事务处于活动状态时,不应对已出队的项执行耗费资源的处理,以避免长时间运行的事务可能对系统产生性能影响。
代码段
让我们看看一些代码片段及其预期输出。 本部分忽略异常处理。
实例
创建可靠并发队列的实例类似于任何其他可靠集合。
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
下面是使用 EnqueueAsync 的一些代码片段及其预期输出。
- 案例 1:单个排队任务
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
假设任务已成功地完成,并且没有其他并发事务正在修改队列。 用户可以期望队列中的项目采用以下任一顺序:
10, 20
20, 10
- 案例 2:并行排队任务
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 无法推断队列中项的顺序。 就这个代码片段而言,项可能以4!种不同的顺序出现。 可能的顺序。 队列尝试按原始(排队)顺序保留这些项,但由于并发操作或错误,可能会强制重新排序这些项。
DequeueAsync
下面是使用 TryDequeueAsync 的一些代码片段及预期输出。 假设队列中已经包含了以下项目:
10, 20, 30, 40, 50, 60
- 案例 1:单个出队任务
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
假定任务已成功完成,且没有并发事务对队列的修改。 由于无法推断队列中项的顺序,任意三个项可以以任意顺序出列。 队列尝试按原始(排队)顺序保留这些项,但由于并发操作或错误,可能会强制重新排序这些项。
- 案例 2:并行出队任务
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 由于无法推断队列中项的顺序,列表 dequeue1 和 dequeue2 均会包含采用任意顺序的任意两个项。
这两个列表中不会显示同一项。 因此,如果 dequeue1 包含 10、30,则 dequeue2 就会包含 20、40。
- 案例 3:事务中止下的出队顺序
中止包含正在进行的出队操作的事务,会使项目重新回到队列的头部。 无法保证将项目放回队列首部的顺序。 请看以下代码:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
假设元素出队的顺序如下:
10, 20
终止交易后,这些项目将以以下任一顺序被重新添加到队列头部:
10, 20
20, 10
这同样适用于所有未成功提交的事务情况。
编程模式
此部分介绍一些编程模式,在使用 ReliableConcurrentQueue 时可能会用到这些模式。
批量出队
建议的编程模式是,让消费者任务批量处理出队操作,而不是一次执行一个出队操作。 用户可以选择限制每个批处理之间的延迟,或者限制批处理大小。 以下代码片段显示了此编程模型。 请注意,在此示例中,处理是在提交事务后完成的,因此,如果在处理过程中发生错误,则未处理项将丢失而不会得到处理。 或者,处理可以在事务的范围内完成,但是它可能会对性能产生负面影响,并且需要处理已处理项。
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
基于通知的尽力处理
另一种有趣的编程模式是使用计数 API。 在这里,我们可以为队列实施基于通知的尽力处理。 可以使用队列计数来限制排队或取消排队任务。 请注意,与前面的示例一样,由于处理在事务外部进行,如果在处理过程中发生错误,则可能会丢失未处理的项。
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
尽力清空
由于数据结构的并发性质,无法保证队列的清空。 有可能即使队列中没有用户操作正在运行,对 TryDequeueAsync 的特定调用也可能不会返回之前已入队和提交的元素。 保证排队项 最终 可以被出队,但是如果没有外部通信机制,独立使用者也不能知道队列已经进入稳定状态,即使所有生成者已经停止,并且不允许新的入队操作。 因此,清空操作是尽力操作,其实现情况如下所示。
用户应停止所有生产者和消费者任务,等待未完成的事务提交或中止,然后尝试清空队列。 如果用户知道队列中预期的项数,则可以设置一个通知,通知所有项都已取消排队。
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
速览
ReliableConcurrentQueue 不提供 TryPeekAsync API。 用户可以使用TryDequeueAsync获取预览信息,然后中止事务。 在此示例中,仅当项的值大于 10 时,才会处理出队操作。
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}