Как написать объект после TransformBlock? - PullRequest
0 голосов
/ 24 мая 2018

У меня есть список объектов, которые мне нужно перебирать параллельно.Вот что мне нужно сделать:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Так как я хочу сделать parellelize, я попытался использовать Parallel.ForEach (), но он не ждет, пока все действительно завершится, так как apiHelper.Get ()делает ожидание внутри себя.

Parallel.ForEach(
                results,
                async (r) =>
                {
                    r.SomeList = await apiHelper.Get(r.Id);
                });

Так что я искал в Интернете и обнаружил следующее: Вложение ждет в Parallel.ForEach

Теперь я очень новичок вTPL (20 минут) и я могу упустить что-то очевидное.Как мне идти дальше?

        var getBlock = new TransformBlock<string, List<Something>>(
            async i =>
            {
                var c = await apiHelper.Get(i);
                return c;
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        foreach (var r in results)
        {
            r.SomeList = getBlock.Post(r.Id);  // ERROR: Can't convert boolean to list.
        }

        getBlock.Complete();

Ответы [ 3 ]

0 голосов
/ 24 мая 2018

Возможно, стоит рассмотреть возможность использования вместо этого Microsoft Reactive Framework.

Вот код:

var query =
    from r in results.ToObservable()
    from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
    select new { r, l };

query
    .Subscribe(x => x.r.SomeList = x.l);

Готово.Параллельно и асинхронно.

Просто NuGet "System.Reactive" и добавьте using System.Reactiive.Linq;.

0 голосов
/ 24 мая 2018

Для вызова API асинхронно и параллельно вам не нужны Reactive или Dataflow.Единственное осложнение в том, что у вас есть, это то, что вы изменяете объект r, устанавливая его свойство в результате вызова API.Тем не менее, то, что вы хотите, довольно просто:

Это:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Становится:

var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);

Предполагается, что apiHelper.Get на самом деле неблокирует и асинхронентогда каждый элемент в results будет иметь асинхронный и параллельный вызов API.

0 голосов
/ 24 мая 2018

Ниже приведен пример использования класса ActionBlock в библиотеке TPL dataflow .

В основном это дает вам параллель, а async и его довольно легкопонять

Пример потока данных

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   result.SomeList = await apiHelper.Get(result.Id);
}

Очевидно, вам понадобится проверка ошибок и добавление перца и соли по вкусу

Также, если предположить apiHelper потокобезопасен

...