Parallel.Foreach () не дает результата - PullRequest
0 голосов
/ 16 ноября 2018

Я пытаюсь запросить mongo-db параллельно, используя Parallel.Foreach(), но я не получаю никаких результатов. Но когда я пытаюсь выполнить то же самое в обычном цикле foreach, я могу выполнить ожидаемые задачи.

var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();

// This works
foreach(var info in infos)
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}

//This does not
Parallel.ForEach(infos, async info =>
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
});

Я хочу выполнять эту задачу параллельно, поскольку участвуют разные коллекции mongodb, а также сократить время отклика.

Я не могу понять, что не так в моем параллельном цикле. Любой другой подход для выполнения этих задач параллельно также будет работать.

Ответы [ 2 ]

0 голосов
/ 16 ноября 2018

Давайте рассмотрим более простой пример, иллюстрирующий те же проблемы

У вас есть код, похожий на этот

var results = new Dictionary<int, int>();

Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

Ваш код не работает, потому что выражение

async index => {...}

возвращает задание, которое не ожидается

как это

Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());

Кстати, когда вы работаете с многопоточностью, как в вашем примере, вы должны использовать ConcurrentDictionary вместо словаря, когда вы делаете параллельные обновления, чтобы избежать ошибок и тупиков

Лучшее решение здесь - не использовать параллельный цикл, а вместо этого использовать Task.WhenAll

var tasks = Enumerable.Range(0, 5).Select(async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

await Task.WhenAll(tasks);
0 голосов
/ 16 ноября 2018

Parallel.ForEach не совместим с передачей в методе async. Если вам нужно что-то похожее на Parallel.ForEach, вы можете использовать Dataflow и его ActionBlock.

var workerBlock = new ActionBlock<Info>(async info => 
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        //Note this is not thread safe and you need to put a lock around it.
        lock (secondaryObjectsDictionaryCollection) 
        {
            secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
        }
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach(var info in infos)
{
    workerBlock.Post(info);
}
workerBlock.Complete();
...