Эффективная блокировка ресурса, идентифицируемого строкой - PullRequest
2 голосов
/ 01 февраля 2020

РЕДАКТИРОВАТЬ: я обновил мои примеры для использования библиотеки https://github.com/StephenCleary/AsyncEx. Все еще ждут полезных подсказок.

Есть ресурсы, которые идентифицируются строками (например, файлы, URL-адреса и т. Д. c.). Я ищу механизм блокировки ресурсов. Я нашел 2 разных решения, но у каждого есть свои проблемы:

Первый использует класс ConcurrentDictionary с AsyncLock:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private static ConcurrentDictionary<string, AsyncLock> mutexes
        = new ConcurrentDictionary<string, AsyncLock>();

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            resourceLocator,
            key => new AsyncLock()
        );
    }
}

Asyn c использование:

using (await Locking.GetMutex("resource_string").LockAsync()) {
    ...
}

Синхронное использование:

using (Locking.GetMutex("resource_string").Lock()) {
    ...
}

Это работает безопасно, но проблема в том, что словарь становится все больше и больше, и я не вижу поточно-безопасный способ удаления элементов из словаря, когда никто не ждет блокировки. (Я также хочу избежать глобальных блокировок.)

Мое второе решение хэширует строку с числом от 0 до N - 1 и фиксирует на них:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private const UInt32 BUCKET_COUNT = 4096;

    private static ConcurrentDictionary<UInt32, AsyncLock> mutexes
        = new ConcurrentDictionary<UInt32, AsyncLock>();

    private static UInt32 HashStringToInt(string text) {
        return ((UInt32)text.GetHashCode()) % BUCKET_COUNT;
    }

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            HashStringToInt(resourceLocator),
            key => new AsyncLock()
        );
    }
}

как единое целое Как видно, второе решение только уменьшает вероятность столкновений, но не избегает их. Больше всего я боюсь, что это может привести к взаимоблокировкам. Основная стратегия, позволяющая избежать взаимоблокировок, - это всегда блокировать элементы в определенном порядке c. Но при таком подходе разные элементы могут отображаться в одни и те же сегменты в разном порядке, например: (A-> X, B-> Y), (C -> Y, D-> X). Таким образом, с этим решением нельзя безопасно заблокировать более одного ресурса.

Есть ли лучшее решение? (Я также приветствую критику вышеупомянутых 2 решений.)

1 Ответ

1 голос
/ 01 февраля 2020

Вероятно, вы могли бы улучшить первое решение, удалив блокировку из словаря, когда он перестает использоваться. Затем удаленные блокировки можно добавить в небольшой пул, так что в следующий раз, когда вам понадобится блокировка, вы просто берете одну из пулов вместо создания новой.


Обновление: Вот реализация этой идеи. Он основан на SemaphoreSlim s вместо AsyncLock s Стивена Клири, потому что для удаления неиспользуемых семафоров из словаря требуется пользовательское одноразовое использование.

public class MultiLock<TKey>
{
    private object Locker { get; } = new object();
    private Dictionary<TKey, LockItem> Dictionary { get; }
    private Queue<LockItem> Pool { get; }
    private int PoolSize { get; }

    public MultiLock(int poolSize = 10)
    {
        Dictionary = new Dictionary<TKey, LockItem>();
        Pool = new Queue<LockItem>(poolSize);
        PoolSize = poolSize;
    }

    public WaitResult Wait(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = lockItem.Semaphore.Wait(millisecondsTimeout,
                cancellationToken);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    public async Task<WaitResult> WaitAsync(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = await lockItem.Semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    private LockItem GetLockItem(TKey key)
    {
        LockItem lockItem;
        lock (Locker)
        {
            if (!Dictionary.TryGetValue(key, out lockItem))
            {
                if (Pool.Count > 0)
                {
                    lockItem = Pool.Dequeue();
                }
                else
                {
                    lockItem = new LockItem();
                }
                Dictionary.Add(key, lockItem);
            }
            lockItem.UsedCount += 1;
        }
        return lockItem;
    }

    private void ReleaseLockItem(LockItem lockItem, TKey key)
    {
        lock (Locker)
        {
            lockItem.UsedCount -= 1;
            if (lockItem.UsedCount == 0)
            {
                if (Dictionary.TryGetValue(key, out var stored))
                {
                    if (stored == lockItem) // Sanity check
                    {
                        Dictionary.Remove(key);
                        if (Pool.Count < PoolSize)
                        {
                            Pool.Enqueue(lockItem);
                        }
                    }
                }
            }
        }
    }

    internal class LockItem
    {
        public SemaphoreSlim Semaphore { get; } = new SemaphoreSlim(1);
        public int UsedCount { get; set; }
    }

    public struct WaitResult : IDisposable
    {
        private MultiLock<TKey> MultiLock { get; }
        private LockItem LockItem { get; }
        private TKey Key { get; }

        public bool LockAcquired { get; }

        internal WaitResult(MultiLock<TKey> multiLock, LockItem lockItem, TKey key,
            bool acquired)
        {
            MultiLock = multiLock;
            LockItem = lockItem;
            Key = key;
            LockAcquired = acquired;
        }

        void IDisposable.Dispose()
        {
            MultiLock.ReleaseLockItem(LockItem, Key);
            LockItem.Semaphore.Release();
        }
    }
}

Пример использования:

var multiLock = new MultiLock<string>();
using (await multiLock.WaitAsync("SomeKey"))
{
    //...
}

Размер пула по умолчанию для неиспользуемых семафоров равен 10. Оптимальным значением должно быть число одновременно работающих рабочих, использующих экземпляр MultiLock.

Я провел тест производительности на своем P C и 10 работникам удалось получить блокировку асинхронно всего 500 000 раз в секунду (использовалось 20 различных строковых идентификаторов).

...