Поскольку ваши методы включают ввод-вывод, они должны быть написаны так, чтобы они были действительно асинхронными, а не только синхронно запускались в пуле потоков с использованием Task.Run
.
Тогда вы могли бы использовать Task.WhenAll
в сочетании с Enumerable.Select
:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
Это гарантирует, что каждый country
> state
> calculation
происходит последовательно, но каждый item
обрабатывается одновременно и асинхронно.
Обновление согласно комментарию
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
Семафор обеспечивает максимум 2 одновременных асинхронных c операций, а токен отмены отменяет все невыполненные задачи после 10 последовательных исключений.
Методы Interlocked
гарантируют, что доступ к failures
будет безопасным для потоков.
Дальнейшее обновление
Возможно, еще эффективнее будет использовать 2 семафора для предотвращения множественных итераций.
Инкапсулируйте все добавление списков в один метод:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
Тогда вы можете разрешить 2 потокам одновременно обслуживать запросы Http, а 1 - выполнять добавление, что делает эту операцию потокобезопасной:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));