Безопасно ли обновлять IProgress из нескольких потоков? - PullRequest
1 голос
/ 21 января 2020

У меня есть некоторый код c# (MVC WebAPI), который перебирает массив идентификаторов параллельно и выполняет вызов API для каждой записи. В первой версии весь код был простым, синхронным для l oop. Теперь мы изменили это на комбинацию Task.WhenAll и LINQ select:

private async Task RunHeavyLoad(IProgress<float> progress) {
  List<MyObj> myElements = new List<MyObj>(someEntries);
  float totalSteps = 1f / myElements.Count();
  int currentStep = 0;

  await Task.WhenAll(myElements.Select(async elem => {
    var result = await SomeHeavyApiCall(elem);
    DoSomethingWithThe(result);
    progress.Report(totalSteps * System.Threading.Interlocked.Increment(ref currentStep) * .1f);
  }

  // Do some more stuff
}

Это упрощенная версия оригинального метода! Фактический метод EnforceImmediateImport вызывается этим методом-концентратором SignalR:

public class ImportStatusHub : Hub {
  public async Task RunUnscheduledImportAsync(DateTime? fromDate, DateTime? toDate) {
    Clients.Others.enableManualImport(false);

    try {
      Progress<float> progress = new Progress<float>((p) => Clients.All.updateProgress(p));
      await MvcApplication.GlobalScheduler.EnforceImmediateImport(progress, fromDate, toDate);

    } catch (Exception ex) {
      Clients.All.importError(ex.Message);
    }

    Clients.Others.enableManualImport(true);
  }
}

Теперь мне интересно, является ли это «потокобезопасным» как таковым, или мне нужно что-то делать с вызовами progress.Report чтобы что-то пошло не так.

Ответы [ 2 ]

2 голосов
/ 21 января 2020

С документы :

Замечания

Любой обработчик, предоставленный конструктору или обработчикам событий, зарегистрированным в событии ProgressChanged, вызывается через захваченный экземпляр SynchronizationContext когда экземпляр построен. Если во время создания не существует текущего SynchronizationContext, обратные вызовы будут вызываться в ThreadPool.

Для получения дополнительной информации и примера кода см. Статью Asyn c в 4.5: Включение выполнения и отмены в Asyn c API в блоге. NET Framework.

Как и все, что использует SynchronizationContext, безопасно отправлять сообщения из нескольких потоков.

Пользовательские реализации IProgress<T> должен определить свое поведение.

0 голосов
/ 21 января 2020

По вашему вопросу, Progress только вызывает. Это зависит от кода, который вы написали для обработки прогресса на другой стороне. Я бы сказал, что строка progress.Report(totalSteps * System.Threading.Interlocked.Increment(ref currentStep) * .1f); может вызвать потенциальную проблему с отчетами о прогрессе из-за умножения, не являющегося атомом c.

Это то, что происходит внутри Progress, когда вы вызываете Report * 1005. *

  protected virtual void OnReport(T value)
  {
      // If there's no handler, don't bother going through the sync context.
      // Inside the callback, we'll need to check again, in case 
      // an event handler is removed between now and then.
      Action<T> handler = m_handler;
      EventHandler<T> changedEvent = ProgressChanged;
      if (handler != null || changedEvent != null)
      {
          // Post the processing to the sync context.
          // (If T is a value type, it will get boxed here.)
          m_synchronizationContext.Post(m_invokeHandlers, value);
      }
  }

Тем не менее, в коде лучший способ работать параллельно - использовать PLinq. В вашем текущем коде, если список содержит много элементов, он будет раскручивать задачи для каждого элемента одновременно и ждать завершения всех из них. Однако в PLinq количество одновременных выполнений будет определяться для оптимизации производительности.

myElements.AsParallel().ForAll(async elem =>
{
    var result = await SomeHeavyApiCall(elem);
    DoSomethingWithThe(result);
    progress.Report(totalSteps * System.Threading.Interlocked.Increment(ref currentStep) * .1f);
}

Помните, что AsParallel (). ForAll () немедленно вернется при использовании asyn c fun c. Поэтому вам может потребоваться захватить все задачи и дождаться их, прежде чем продолжить.

И последнее: если ваш список редактируется во время его обработки, я рекомендую использовать ConcurrentQueue или ConcurrentDictionary или ConcurrentBag.

...