Azure Service Fabric中可靠集合的准则和建议

本部分提供有关使用可靠状态管理器和 Reliable Collections 的指导原则。 目的是帮助用户避免常见错误。

可靠集合指导原则

这些准则组织为简单建议,其前缀为 “Do”、“ 考虑”、“ 避免”和 “请勿”。

不要

  • 请勿处置或取消正在提交的事务。 这不受支持,可能会使主机进程崩溃。
  • 请勿在事务已提交、已中止或已释放后再使用该事务。
  • 请勿 修改读取操作返回的自定义类型的对象(例如, TryPeekAsyncTryGetValueAsync)。 Reliable Collections 与 Concurrent Collections 一样,返回对这些对象的引用,而非副本。 相反,在修改之前,请先对返回的自定义类型对象进行深拷贝。 由于结构和内置类型是按值传递的,因此无需对其执行深层复制,除非它们包含要修改的引用类型字段或属性。 请参阅 示例:不要修改读取操作返回的对象
  • 不要创建可靠状态并在同一事务中使用它 IReliableStateManager.GetOrAddAsync。 这会导致一个 InvalidOperationException. 请参阅 示例:创建状态并将其用于单独的事务
  • 不要 在事务中应用 并行并发 操作。 一个事务中仅支持一个用户线程操作。 否则,会导致内存泄漏和锁定问题。 请参阅 示例:避免事务中的并行或并发操作
  • 不要 在另一个事务的 using 语句中创建事务,因为它可能会导致死锁。 请参阅 示例:避免嵌套事务
  • 不要使用TimeSpan.MaxValue来设置超时。 超时机制用于检测复制停滞和死锁。 过大的值可能会隐藏系统问题和延迟故障检测。 所有 Reliable Collection API 的默认超时值均为 4 秒。 大多数用户应使用默认超时值。
  • 不要 在创建它的事务作用域之外使用枚举。 请参阅 示例:在事务作用域内枚举

务必

  • 务必确保所有事务在使用后都得到释放。 未进行释放的可枚举事务会因无法清理旧数据而阻碍内部检查点清理逻辑,并导致 Reliable Collections 的大小膨胀。 请参阅 示例:释放可枚举事务
  • Do在添加或更改状态提供程序或相关序列化程序时,分多个阶段升级代码,因为这是由 Service Fabric 中滚动升级的特性决定的。 有关更多详细信息,请参阅Azure Service Fabric 中 Reliable Collection 对象序列化 - 可升级性
  • 请确保IComparable<TKey>实现正确。 系统依赖 IComparable<TKey> 进行检查点和行的合并。
  • 请确保 读取或写入多个字典时,不同字典的操作顺序对所有并发事务保持相同,以避免死锁。 请参阅 示例:跨字典的一致操作顺序
  • 读取要在同一事务中修改的项时,请使用更新锁。 这可以防止当多个事务使用共享锁读取同一密钥时发生死锁,以后尝试升级到独占锁。 请参阅 示例:使用更新锁进行先读后写
  • Do 句柄 InvalidOperationException。 由于各种原因,系统可能会中止用户事务。 例如,当可靠状态管理器将其角色从“主要”更改为其他角色时,或者当长时间运行的事务阻止截断事务日志时。 在这种情况下,用户可能会收到一个 InvalidOperationException,表明其事务已被终止。 假设终止该事务并非用户主动请求,处理此异常的最佳方式是释放该事务,检查取消令牌是否已被触发(或副本的角色是否已发生更改);如果没有,则创建一个新事务并重试。
  • 务必采用指数退避来实现有界重试。
  • 务必在投入生产前评估并验证序列化开销。 此成本应尽可能最小化,因为必须序列化所有状态才能进行复制和持久性。 低效的序列化程序可以显著增加 CPU 使用率和延迟。
  • 请根据 Reliable Collection 的写入速率和典型事务大小调整 CheckpointThresholdInMB 设置。 此值应至少是最大事务大小的两倍,因此,在稳定状态写入负载下,此设置每 30-120 秒超过一次。

避免

  • 避免 使用热键。 将写入均匀分布到各个键上,以避免形成都在等待同一个键被锁定的长队列。
  • 避免包含阻塞代码的长时间运行事务。 事务应尽量短,在事务中使用阻塞代码可能会阻碍日志截断,导致锁争用以及后续超时。 请参阅 示例:避免在事务中执行长时间运行的操作
  • 避免 将单个实体操作和多实体操作(例如 GetCountAsyncCreateEnumerableAsync)混合在同一事务中,因为隔离级别不同。 有关详细信息,请参阅 可靠集合中的事务和锁定模式
  • 避免出于不提交事务的目的而创建多操作事务。 这些事务仍会将先前的操作复制到辅助副本;如果在未提交的情况下被处置,则除了中止事务外,还需要回退每个辅助副本上的进度。

考虑

  • 请考虑 在提交完成后尽快释放事务(尤其是在使用 ConcurrentQueue 时)。
  • 请考虑将项的大小(例如,对于 Reliable Dictionary,则为 TKey + TValue)保持在 80 KB 以下:越小越好。 这会减少大型对象堆的使用量,并降低磁盘和网络 IO 的要求。 通常情况下,当只更新值的一小部分时,它可以减少重复数据的复制。 在可靠字典中实现此目的的一种常见方法是将行分解成多行。
  • 考虑将每个分区中的可靠集合数量保持在 1000 以下。 建议选择包含更多项的可靠集合,而不是多个包含较少项的可靠集合。
  • 请考虑 使用备份和还原功能进行灾难恢复。

需要记住的一些事项:

  • 字符串用作可靠字典的键时,排序顺序使用默认字符串比较器 CurrentCulture。 请注意,CurrentCulture 排序顺序不同于Ordinal 字符串比较器
  • 所有 Reliable Collections API 中的默认取消标记均为 CancellationToken.None
  • 可靠字典的键类型参数 (TKey) 必须正确实现 GetHashCode()Equals()。 键必须不可变。
  • 若要实现 Reliable Collections 的高可用性,每个服务应至少有一个目标,并且最小副本集大小必须为 3。
  • 辅助副本上的读取操作可能会读取未提交仲裁的版本。 这意味着从单个辅助副本读取的数据版本可能被错误处理。 从主库读取总是稳定的:它们绝不会出现虚假推进。
  • 应用程序在可靠集合中保存的数据的安全/隐私是你的决定,并受存储管理所提供的保护的约束;例如,操作系统磁盘加密可用于保护静态数据。
  • ReliableDictionary 枚举使用按键排序的排序数据结构。 为了提高枚举的效率,提交会被添加到临时散列表中,然后在检查点后移动到主要的排序数据结构中。 如果需要验证检查是否存在键,“添加”/“更新”/“删除”操作的最佳运行时为 O(1),最差运行时为 O(log n)。 读取可能是 O(1)或 O(log n),具体取决于您是从最新提交还是从早期提交中读取。

关于持久化可靠集合的其他指导原则

决定使用易失可靠集合时,请考虑以下事项:

  • ReliableDictionary 具有易失性支持
  • ReliableQueue 具有易失性支持
  • ReliableConcurrentQueue 不具有易失性支持
  • 持久化服务无法变为易失性服务。 将 HasPersistedState 标志改为 false 需要从头开始重新创建整个服务
  • 易失性服务无法变为持久化服务。 将 HasPersistedState 标志改为 true 需要从头开始重新创建整个服务
  • HasPersistedState 是服务级别的配置。这意味着所有集合都可归为持久化集合和易失性集合中的一种。 不能混合可变集合和持久化集合
  • 由于法定人数的丧失,易失性分区会导致数据完全丢失
  • 备份和还原不可用于易失性服务

示例

以下示例说明了 Reliable Collection 指南 部分中的建议。

示例:不要修改读取操作返回的对象

适用于: 请勿 修改读取操作返回的自定义类型的对象。

// Suppose Order is a reference type stored in the dictionary:
//     class Order { public int Quantity { get; set; } public List<string> Tags { get; set; } }

// INCORRECT: mutating the reference returned by the dictionary directly.
// The in-memory instance is shared with the Reliable Collection, so this
// change is visible without going through a transaction and is never
// replicated or persisted. Concurrent readers may also observe partial
// mutations.
using (var tx = this.StateManager.CreateTransaction())
{
    var result = await orders.TryGetValueAsync(tx, "order-1");
    if (result.HasValue)
    {
        result.Value.Quantity += 1;          // mutating the shared reference
        result.Value.Tags.Add("expedited");  // mutating a reference-typed field
    }
    await tx.CommitAsync();
}

// CORRECT: deep copy the value, modify the copy, then write it back via
// the dictionary so the update is transacted, replicated, and persisted.
using (var tx = this.StateManager.CreateTransaction())
{
    var result = await orders.TryGetValueAsync(tx, "order-1");
    if (result.HasValue)
    {
        var updated = new Order
        {
            Quantity = result.Value.Quantity + 1,
            Tags = new List<string>(result.Value.Tags) { "expedited" },
        };

        await orders.SetAsync(tx, "order-1", updated);
    }
    await tx.CommitAsync();
}

示例:创建状态并将其用于单独的事务

适用于:不要在同一事务中使用 IReliableStateManager.GetOrAddAsync 创建并使用可靠状态。

// INCORRECT: creating (or getting) reliable state and then using it
// in the same transaction.
using (var tx = this.StateManager.CreateTransaction())
{
    var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, int>>(
        tx, "myDictionary");

    // Throws InvalidOperationException when myDictionary was newly created above,
    // because the new state isn't registered until the creating transaction commits.
    await myDictionary.AddAsync(tx, "key1", 42);

    await tx.CommitAsync();
}

// CORRECT: create (or get) the reliable state in one transaction,
// then use it in a separate transaction.
IReliableDictionary<string, int> myDictionary;
using (var tx = this.StateManager.CreateTransaction())
{
    myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, int>>(
        tx, "myDictionary");
    await tx.CommitAsync();
}

using (var tx = this.StateManager.CreateTransaction())
{
    await myDictionary.AddAsync(tx, "key1", 42);
    await tx.CommitAsync();
}

示例:避免事务中的并行或并发操作

适用于: 不要 在事务中应用并行或并发操作。

// INCORRECT: fanning out concurrent operations on the same transaction.
using (var tx = this.StateManager.CreateTransaction())
{
    var writes = keys.Select(k => myDictionary.AddAsync(tx, k, 0));

    // Multiple operations on `tx` run concurrently � causes lock/memory issues.
    await Task.WhenAll(writes);

    await tx.CommitAsync();
}

// ALSO INCORRECT: Parallel.ForEachAsync sharing a single transaction across threads.
using (var tx = this.StateManager.CreateTransaction())
{
    await Parallel.ForEachAsync(keys, async (k, _) =>
    {
        await myDictionary.AddAsync(tx, k, 0);
    });

    await tx.CommitAsync();
}

// CORRECT: issue operations on the transaction one at a time (sequentially awaited).
using (var tx = this.StateManager.CreateTransaction())
{
    foreach (var k in keys)
    {
        await myDictionary.AddAsync(tx, k, 0);
    }

    await tx.CommitAsync();
}

示例:避免嵌套事务

适用于: 不要 在另一个事务的 using 语句中创建事务。

// INCORRECT: an inner transaction is created while the outer transaction
// is still open. The inner tx tries to acquire locks on keys the outer tx
// already holds, but the outer can't release them until it commits � and
// it can't commit until the inner finishes. The two transactions deadlock.
using (var outerTx = this.StateManager.CreateTransaction())
{
    await myDictionary.SetAsync(outerTx, "key1", 1);

    using (var innerTx = this.StateManager.CreateTransaction())
    {
        // Blocks waiting for the write lock on "key1" held by outerTx.
        await myDictionary.SetAsync(innerTx, "key1", 2);
        await innerTx.CommitAsync();
    }

    await outerTx.CommitAsync();
}

// CORRECT: commit and dispose each transaction before starting the next one.
using (var tx = this.StateManager.CreateTransaction())
{
    await myDictionary.SetAsync(tx, "key1", 1);
    await tx.CommitAsync();
}

using (var tx = this.StateManager.CreateTransaction())
{
    await myDictionary.SetAsync(tx, "key1", 2);
    await tx.CommitAsync();
}

示例:在事务作用域内进行枚举

适用于:不要在其创建所在的事务范围之外使用枚举对象。

// INCORRECT: the enumerable is captured but enumerated after its
// transaction is disposed. The enumeration is tied to the tx's snapshot
// and becomes invalid as soon as the tx is committed and disposed.
IAsyncEnumerable<KeyValuePair<string, int>> enumerable;
using (var tx = this.StateManager.CreateTransaction())
{
    enumerable = await myDictionary.CreateEnumerableAsync(tx);
    await tx.CommitAsync();
} // tx is disposed here � the enumeration is no longer valid.

var enumerator = enumerable.GetAsyncEnumerator();
while (await enumerator.MoveNextAsync(CancellationToken.None))
{
    var current = enumerator.Current;
}

// CORRECT: enumerate within the same transaction scope that created the enumerable.
using (var tx = this.StateManager.CreateTransaction())
{
    var enumerable = await myDictionary.CreateEnumerableAsync(tx);
    var enumerator = enumerable.GetAsyncEnumerator();
    while (await enumerator.MoveNextAsync(CancellationToken.None))
    {
        var current = enumerator.Current;
        // process current
    }
    await tx.CommitAsync();
}

示例:释放可枚举事务

适用于:务必确保所有事务在使用后都被释放。

// INCORRECT: an enumerable is created on a transaction that lives outside
// a `using` block. If iteration throws (or a later code path returns
// early), the transaction is never disposed. Leaked enumerable
// transactions hold their snapshot and prevent internal checkpoint cleanup logic,
// causing the Reliable Collection to grow over time.
var tx = this.StateManager.CreateTransaction();
var enumerable = await myDictionary.CreateEnumerableAsync(tx);
var enumerator = enumerable.GetAsyncEnumerator();
while (await enumerator.MoveNextAsync(CancellationToken.None))
{
    var current = enumerator.Current;
    // process current
}
await tx.CommitAsync();
// tx.Dispose() is never called.

示例:跨字典的一致操作顺序

适用于: 请确保 对于所有并发事务,不同字典的操作顺序保持不变。

// INCORRECT: TransferAtoB locks dictA before dictB, while TransferBtoA locks
// dictB before dictA. If they run concurrently, each transaction can hold
// one dictionary's lock and block waiting for the other � they deadlock.
async Task TransferAtoB(long amount)
{
    using (var tx = this.StateManager.CreateTransaction())
    {
        await dictA.AddOrUpdateAsync(tx, "balance", k => -amount, (k, v) => v - amount);
        await dictB.AddOrUpdateAsync(tx, "balance", k => amount,  (k, v) => v + amount);
        await tx.CommitAsync();
    }
}

async Task TransferBtoA(long amount)
{
    using (var tx = this.StateManager.CreateTransaction())
    {
        await dictB.AddOrUpdateAsync(tx, "balance", k => -amount, (k, v) => v - amount);
        await dictA.AddOrUpdateAsync(tx, "balance", k => amount,  (k, v) => v + amount);
        await tx.CommitAsync();
    }
}

// CORRECT: both methods touch dictA before dictB. Any number of concurrent
// transfers acquire the locks in the same global order and cannot deadlock.
async Task TransferAtoB(long amount)
{
    using (var tx = this.StateManager.CreateTransaction())
    {
        await dictA.AddOrUpdateAsync(tx, "balance", k => -amount, (k, v) => v - amount);
        await dictB.AddOrUpdateAsync(tx, "balance", k => amount,  (k, v) => v + amount);
        await tx.CommitAsync();
    }
}

async Task TransferBtoA(long amount)
{
    using (var tx = this.StateManager.CreateTransaction())
    {
        await dictA.AddOrUpdateAsync(tx, "balance", k => amount,  (k, v) => v + amount);
        await dictB.AddOrUpdateAsync(tx, "balance", k => -amount, (k, v) => v - amount);
        await tx.CommitAsync();
    }
}

示例:对先读后写操作使用更新锁

适用于:读取要在同一事务中修改的项时 ,请使用 更新锁。

// INCORRECT: read uses the default (shared) lock, then the same transaction
// tries to upgrade to an exclusive lock to write. Two concurrent transactions
// running this pattern both hold a shared read lock on "key1" and each waits
// for the other to release before they can upgrade � they deadlock.
using (var tx = this.StateManager.CreateTransaction())
{
    var current = await myDictionary.TryGetValueAsync(tx, "key1");

    if (current.HasValue)
    {
        await myDictionary.SetAsync(tx, "key1", current.Value + 1);
    }

    await tx.CommitAsync();
}

// CORRECT: read with LockMode.Update. Only one transaction at a time can
// hold the Update lock on "key1", so the later upgrade to an exclusive
// write lock cannot deadlock with another transaction following the same
// read-then-write pattern.
using (var tx = this.StateManager.CreateTransaction())
{
    var current = await myDictionary.TryGetValueAsync(tx, "key1", LockMode.Update);

    if (current.HasValue)
    {
        await myDictionary.SetAsync(tx, "key1", current.Value + 1);
    }

    await tx.CommitAsync();
}

示例:避免在事务中执行长时间运行的操作

适用于:避免包含阻塞代码的长时间运行事务。

// INCORRECT: a long-running call inside the transaction keeps the write
// lock on "key1" held for the duration of the call.
using (var tx = this.StateManager.CreateTransaction())
{
    // AddOrUpdateAsync acquires a write lock on "key1".
    await myDictionary.AddOrUpdateAsync(tx, "key1", k => 0, (k, existing) => existing + 1);

    // This HTTP call may run for a long time. The write lock on "key1"
    // is held for the entire duration, blocking other writers to the
    // same key.
    var payload = await httpClient.GetStringAsync(uri);

    await tx.CommitAsync();
}

后续步骤