Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
可靠并发队列是一种异步的、事务性的已复制队列,其特点是排队和取消排队操作的高并发性。 它旨在降低可靠队列提供的严格的 FIFO 排序要求,代之以“尽力排序”要求,从而提高吞吐量并降低延迟。
API
并发队列 | 可靠并发队列 |
---|---|
void Enqueue(T item) | 任务 EnqueueAsync(ITransaction tx, T 项) |
bool TryDequeue(out T result) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
int Count() | long Count() |
与可靠队列比较
可靠并发队列作为可靠队列的替代推出, 其使用范围局限于不需要严格 FIFO 排序的情况,这种情况下,在尽量保证 FIFO 的同时,还需要考虑到并发性。 可靠队列以锁定方式强制实施 FIFO 排序,一次只允许一个事务排队,一个事务取消排队。 相对而言,可靠并发队列对排序的要求较松,允许任意数目的并发事务交叉进行排队和取消排队操作。 可靠并发队列的原则是“尽力排序”,但无法保证两个值的相对顺序。
在有多个并发事务需要执行排队和/或取消排队操作的情况下,可靠并发队列相对于可靠队列而言可以提高吞吐量并降低延迟。
ReliableConcurrentQueue 的一个示例用例是“消息队列”
场景。 在该方案中,一个或多个消息生成者会创建项并将其添加到队列中,同时还会有一个或多个消息使用者从队列拉取消息并对其进行处理。 多个生成者和使用者可以独立操作,使用并发事务来处理队列。
使用准则
- 队列希望队列中的项的保留期较短。 换句话说,项呆在队列中的时间不宜过长。
- 队列不保证严格的 FIFO 排序。
- 队列不读取自己的写入。 项在事务中排队时,该项对于同一事务中的取消排队者来说为不可见。
- 取消排队不是相互隔离的。 如果项 A 在事务 txnA 中取消排队,则即使 txnA 尚未提交,项 A 也不会对并发事务 txnB 可见。 如果 txnA 中止,A 会立刻变得对 txnB 可见。
- 可以先使用 TryDequeueAsync,再中止事务,从而实现 TryPeekAsync 行为。 可以在“编程模式”部分中找到此行为的示例。
- 计数是非事务性的。 可以通过计数来了解队列中的元素数目,但计数只代表一个时间点的情况,可靠性不强。
- 在事务处于活动状态时,不应对取消排队项执行开销高昂的处理,以避免可能对系统性能产生影响的长时间运行事务。
代码段
让我们看看一些代码片段及其预期输出。 本部分忽略异常处理。
实例
创建可靠并发队列的实例类似于任何其他可靠集合。
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
下面是使用 EnqueueAsync 的一些代码片段及其预期输出。
- 案例 1:单个排队任务
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
假定任务已成功完成,且没有并发事务在修改队列。 用户可以预期队列包含的项采用以下顺序:
10, 20
20, 10
- 案例 2:并行排队任务
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 无法推断队列中项的顺序。 就此代码片段来说,项的显示顺序可能是 4! 种 可能的顺序之一。 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。
DequeueAsync
下面是使用 TryDequeueAsync 的一些代码片段及预期输出。 假定已在队列中填充以下项:
10, 20, 30, 40, 50, 60
- 案例 1:单个取消排队任务
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
假定任务已成功完成,且没有并发事务在修改队列。 由于无法推断队列中项的顺序,可能会采用任意顺序取消任意三个项的排队。 队列会尝试让项保持原有的(排队)顺序,但在出现并发操作或错误的情况下,也可能会强制其重新排序。
- 案例 2:并行取消排队任务
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
假定任务已成功完成且是以并行方式运行的,同时没有其他并发事务在修改队列。 由于无法推断队列中项的顺序,列表 dequeue1 和 dequeue2 均会包含采用任意顺序的任意两个项。
同一项不会出现在两个列表中。 因此,如果 dequeue1 包含 10、30,则 dequeue2 就会包含 20、40。
- 案例 3:通过中止事务排定取消排队任务顺序
中止正在取消排队的事务,项就会重新回到队列头。 项回到队列头的顺序是不确定的。 请看以下代码:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
假定项在取消排队时的顺序如下:
10, 20
中止事务后,项会采用以下顺序之一添加回队列头:
10, 20
20, 10
事务未成功提交的所有案例均是如此。
编程模式
此部分介绍一些编程模式,在使用 ReliableConcurrentQueue 时可能会用到这些模式。
按批取消排队
建议的编程模式是,通过使用者任务按批取消排队,而不是一次执行一个取消排队操作。 用户可以选择限制每个批处理之间的延迟,或者限制批处理大小。 以下代码片段显示了此编程模型。 请注意,在此示例中,处理是在提交事务后完成的,因此,如果在处理过程中发生错误,则未处理项将丢失而不会得到处理。 或者,处理可以在事务的范围内完成,但是它可能会对性能产生负面影响,并且需要处理已处理项。
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
基于通知的尽力处理
另一种有趣的编程模式是使用计数 API。 在这里,我们可以为队列实施基于通知的尽力处理。 可以使用队列计数来限制排队或取消排队任务。 请注意,与前面的示例一样,由于处理在事务外部进行,如果在处理过程中发生错误,则可能会丢失未处理的项。
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
尽力清空
考虑到数据结构的并发特性,不保证队列会被清空。 即使队列中没有正在进行的用户操作,对 TryDequeueAsync 的特定调用也可能不会返回以前排队和提交的项。 排队的项最终必定会变得对取消排队操作可见,但在没有带外通信机制的情况下,独立的使用者无法知道队列是否已达到稳定状态,即使系统已停止所有生成者且不允许新的排队操作。 因此,清空操作只能尽力而为,其执行情况如下所示。
用户应停止所有后续的生成者和使用者任务,等待正在进行的事务提交或中止,并尝试清空队列。 如果用户知道队列中预期的项数,则可以设置一个通知,通知所有项都已取消排队。
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
速览
可靠并发队列不提供 TryPeekAsync API。 用户可以先使用 TryDequeueAsync,再中止事务,从而获取速览语义。 在以下示例中,仅当项的值大于 10 时,才会处理取消排队操作。
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}