Является ли ConcurrentDictionary.GetOrAdd действительно потокобезопасным? - PullRequest
0 голосов
/ 17 декабря 2018

У меня есть этот фрагмент кода, где я хочу дождаться выполнения текущей задачи, если эта задача была создана для того же ввода.Вот минимальное воспроизведение того, что я делаю.

private static ConcurrentDictionary<int, Task<int>> _tasks = new ConcurrentDictionary<int, Task<int>>();

private readonly ExternalService _service;


public async Task SampleTask(){
  var result = await _service.DoSomething();
  await Task.Delay(1000) //this task takes some time do finish
  return result;
}

public async Task<int> DoTask(int key) {
   var task = _tasks.GetOrAdd(key, _ => SampleTask());
   var taskResult = await task;
   _tasks.TryRemove(key, out task);
   return taskResult;
}

Я пишу тест, чтобы убедиться, что одна и та же задача ожидается, когда несколько запросов хотят выполнить задачу (примерно) в одно и то же время.Я делаю это, высмеивая _service и подсчитывая, сколько раз _service.DoSomething() вызывается.Это должно быть только один раз, если вызовы DoTask(int key) были сделаны примерно в одно и то же время.

Однако результаты показывают мне, что если я позвоню DoTask(int key) более одного раза с задержкой между вызовами менее чемЧерез 1 ~ 2 мс обе задачи создадут и выполнят ее в экземпляре SampleTask(), а вторая заменит первую в словаре.

Учитывая это, можно ли сказать, что этот метод действительно поточно-ориентированный?Или моя проблема не является безопасностью потока как таковой?

1 Ответ

0 голосов
/ 17 декабря 2018

Цитировать документацию (выделено мной):

Для модификаций и операций записи в словарь, ConcurrentDictionary<TKey,TValue> использует мелкозернистую блокировкудля обеспечения безопасности резьбы.(Операции чтения словаря выполняются без блокировок.) Однако делегат valueFactory вызывается вне блокировок, чтобы избежать проблем, которые могут возникнуть при выполнении неизвестного кода под блокировкой.Следовательно, GetOrAdd не является атомарным в отношении всех других операций класса ConcurrentDictionary<TKey,TValue>.

Поскольку ключ / значение может быть вставлен другим потоком, пока генерируется valueFactoryзначение, которому нельзя доверять только потому, что valueFactory выполнено, его полученное значение будет вставлено в словарь и возвращено. Если вы вызываете GetOrAdd одновременно в разных потоках, valueFactory можно вызывать несколько раз, но в словарь будет добавлена ​​только одна пара ключ / значение.

Такв то время как словарь является поточно-ориентированным, вызовы valueFactory или _ => SampleTask() в вашем случае не гарантируют, что они являются уникальными.Таким образом, ваша заводская функция должна соответствовать этому факту.

Вы можете подтвердить это из источника :

public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
    if (key == null) throw new ArgumentNullException("key");
    if (valueFactory == null) throw new ArgumentNullException("valueFactory");

    TValue resultingValue;
    if (TryGetValue(key, out resultingValue))
    {
        return resultingValue;
    }
    TryAddInternal(key, valueFactory(key), false, true, out resultingValue);
    return resultingValue;
}

Как видите, valueFactoryвызывается за пределами TryAddInternal, который отвечает за правильную блокировку словаря.

Однако, поскольку valueFactory является лямбда-функцией, которая возвращает задачу в вашем случае (_ => SampleTask()), и словарь будетне дожидаясь выполнения этой задачи, функция завершит быстро и просто вернет неполное Task после обнаружения первого await (когда настроен асинхронный конечный автомат).Поэтому, если вызовы выполняются очень быстро за другим, задача должна быть очень быстро добавлена ​​в словарь, и последующие вызовы будут повторно использовать ту же задачу.

Если требуется, чтобы это происходило один раз в all случаях, вы должны рассмотреть возможность блокировки при создании задачи самостоятельно.Поскольку он быстро завершится (независимо от того, сколько времени на самом деле потребуется для решения задачи), блокировка не повредит так сильно.

...