Есть ли объяснение этому многопоточному коду? - PullRequest
0 голосов
/ 05 февраля 2020

Итак, мы столкнулись с кодом, очень похожим на этот. Мне просто интересно, если кто-то может объяснить мне это.

Посмотрите, как он использует планировщик RX, а затем Parallel.For, и внутри этого нового TaskFactory.StartNew

    IDisposable subscription = someObservable.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
       {
           Parallel.ForEach(xxxs,
               x =>
               {
                   var theKey = x.Key;
                   if (!theTasks.ContainsKey(theKey) ||
                       theTasks.ContainsKey(theKey) && theTasks[theKey].IsCompleted)
                   {
                       theTasks[theKey] = Task.Factory.StartNew(
                           () =>
                           {
                               .....
                               }
                               catch (CommunicationObjectAbortedException ex)
                               {
                               ....
                               }
                               catch (ObjectDisposedException ex)
                               {
                               ....
                               }
                               catch (Exception e)
                               {
                               ....
                               }
                           });
                   }
               });
       },
       ex =>
       {
          ....
       },
       () =>
       {
          ....
       });
    } 

Я знаю, что все это все происходит индивидуально, но я не совсем уверен, каков здесь комбинированный эффект потоков. Может ли кто-нибудь опасаться догадки

1 Ответ

2 голосов
/ 06 февраля 2020

Ах да, параллелизм Turducken .

ThreadPoolScheduler планирует работу с пулом потоков , отличным от пула задач, ThreadPoolScheduler предназначался для использования на платформах, где пул задач не был доступен - предпочитайте TaskPoolScheduler, когда это возможно.

Такое ощущение, что писатель пытался сохранить пул задач только для выполняемой задачи (простите за каламбур), используя пул потоков.

Parallel.ForEach блокируется, пока l oop не будет завершено. Поэтому, пока он выполнялся в пуле потоков, когда генерируется новый элемент, выполните следующее ForEach в заимствованном потоке из пула потоков.

Что касается внутреннего бита, то пишущий хочет один Task для запуска по уникальному ключу, если он еще не запущен.

...