Множественное asyn c - ожидание цепочки внутри Parallel.ForEach - PullRequest
4 голосов
/ 17 июня 2020

У меня есть Parallel.ForEach l oop, который проходит через коллекцию. Внутри l oop я делаю несколько сетевых вызовов ввода-вывода. Я использовал Task.ContinueWith и вложил последующие вызовы asyn c -await. Порядок обработки не имеет значения, но данные из каждого вызова asyn c должны обрабатываться синхронно. Значение. Для каждой итерации данные, полученные из первого вызова asyn c, должны передаваться во второй вызов asyn c. После завершения второго вызова asyn c данные обоих вызовов asyn c должны обрабатываться вместе.

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});

Я пробовал описанное выше, не используя continue await, но он не работал синхронно . Теперь приведенный выше код выполняется до конца, но никакие записи не обрабатываются.

Любая помощь с этим, пожалуйста? Дайте мне знать, если я могу добавить больше деталей.

Ответы [ 2 ]

7 голосов
/ 17 июня 2020

Поскольку ваши методы включают ввод-вывод, они должны быть написаны так, чтобы они были действительно асинхронными, а не только синхронно запускались в пуле потоков с использованием Task.Run.

Тогда вы могли бы использовать Task.WhenAll в сочетании с Enumerable.Select:

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}

Это гарантирует, что каждый country> state> calculation происходит последовательно, но каждый item обрабатывается одновременно и асинхронно.

Обновление согласно комментарию

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});

Семафор обеспечивает максимум 2 одновременных асинхронных c операций, а токен отмены отменяет все невыполненные задачи после 10 последовательных исключений.

Методы Interlocked гарантируют, что доступ к failures будет безопасным для потоков.

Дальнейшее обновление

Возможно, еще эффективнее будет использовать 2 семафора для предотвращения множественных итераций.

Инкапсулируйте все добавление списков в один метод:

void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}

Тогда вы можете разрешить 2 потокам одновременно обслуживать запросы Http, а 1 - выполнять добавление, что делает эту операцию потокобезопасной:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));
5 голосов
/ 17 июня 2020

Я думаю, вы слишком усложняете это; внутри Parallel.ForEach вы уже находитесь в пуле потоков , так что на самом деле нет никакой пользы от создания множества дополнительных задач внутри. Так; как это сделать, действительно зависит от того, являются ли GetState et c синхронными или асинхронными. Если мы предполагаем синхронные, тогда что-то вроде:

Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

Если они асинхронные c, это становится более неудобным; было бы красиво , если бы мы могли сделать что-то вроде:

// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

, но проблема здесь в том, что ни один из обратных вызовов в Parallel.ForEach не является «ожидаемым», что означает: мы молча создал здесь обратный вызов async void, что очень плохо. Это означает, что Parallel.ForEach будет думать, что он «закончен», как только произойдет неполное await, что означает:

  1. мы не имеем ни малейшего понятия, когда вся работа имеет на самом деле готово
  2. вы могли бы делать намного больше одновременно, чем предполагали (max-dop не может соблюдаться)

Похоже, нет хорошего API для избегайте этого в настоящее время.

...