Ожидание завершения некоторых задач (Task.WhenSome) - PullRequest
0 голосов
/ 10 мая 2019

Я пишу сервис, который объединяет данные из различных интернет-источников и генерирует ответ на лету.Скорость важнее, чем полнота, поэтому я хотел бы получить свой ответ, как только некоторые (не все) интернет-источники ответили.Обычно мой сервис создает 10 одновременных веб-запросов и должен прекратить ждать и начать обработку после того, как 5 из них будут завершены.Ни .NET Framework, ни какие-либо сторонние библиотеки, о которых я знаю, не предлагают эту функциональность, поэтому мне, вероятно, придется написать ее самому.Метод, который я пытаюсь реализовать, имеет следующую подпись:

public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
    // TODO
}

Вопреки тому, как работает Task.WhenAny, исключения следует проглатывать при условии, что было получено необходимое количество результатов.,Однако, если после завершения всех задач недостаточно собранных результатов, следует создать AggregateException, распространяющий все исключения.

Пример использования:

var tasks = new Task<int>[]
{
    Task.Delay(100).ContinueWith<int>(_ => throw new ApplicationException("Oops!")),
    Task.Delay(200).ContinueWith(_ => 10),
    Task.Delay(Timeout.Infinite).ContinueWith(_ => 0, new CancellationTokenSource(300).Token),
    Task.Delay(400).ContinueWith(_ => 20),
    Task.Delay(500).ContinueWith(_ => 30),
};
var results = await WhenSome(2, tasks);
ConsolePrint($"Results: {String.Join(", ", results)}");

Ожидаемый результат:

Результаты: 10, 20

В этом примере последнее задание, возвращающее значение 30, должно игнорироваться (даже не ожидаться), поскольку мы уже получили числорезультатов мы хотим (2 результата).Неисправные и отмененные задачи также следует игнорировать по той же причине.

1 Ответ

0 голосов
/ 10 мая 2019

Это какой-то неуклюжий код, который, я думаю, отвечает вашим требованиям.Это может быть отправной точкой.

Это также может быть плохим способом обработки задач и / или не безопасным для потоков, и / или просто ужасной идеей.Но я ожидаю, что если так, то кто-то укажет на это.

async Task<TResult[]> WhenSome<TResult>(int atLeast, List<Task<TResult>> tasks)
{
    List<Task<TResult>> completedTasks = new List<System.Threading.Tasks.Task<TResult>>();
    int completed = 0;
    List<Exception> exceptions = new List<Exception>();

    while (completed < atLeast && tasks.Any()) {
        var completedTask = await Task.WhenAny(tasks);
        tasks.Remove(completedTask);

        if (completedTask.IsCanceled)
        {
            continue;
        }

        if (completedTask.IsFaulted)
        {
            exceptions.Add(completedTask.Exception);
            continue;
        }

        completed++;
        completedTasks.Add(completedTask);
    }

    if (completed >= atLeast)
    {
        return completedTasks.Select(t => t.Result).ToArray();
    }

    throw new AggregateException(exceptions).Flatten();
}
...