Как выполнить произвольное количество асин c задач в последовательном порядке? - PullRequest
0 голосов
/ 22 января 2020

У меня есть эта функция:

async Task RefreshProfileInfo(List<string> listOfPlayers)

// For each player in the listOfPlayers, checks an in-memory cache if we have an entry.
// If we have a cached entry, do nothing.
// If we don't have a cached entry, fetch from backend via an API call.

Эта функция вызывается очень часто, например:

await RefreshProfileInfo(playerA, playerB, playerC)

или

await RefreshProfileInfo(playerB, playerC, playerD) 

или

await RefreshProfileInfo(playerE, playerF)

В идеале, если игроки не перекрывают друг друга, вызовы не должны влиять друг на друга (запрос PlayerE и PlayerF не должен блокировать запрос PlayerA, PlayerB, Player C). Однако, если игроки перекрывают друг друга, второй вызов должен ждать первого (запрашивающий PlayerB, Player C, PlayerD, должен дождаться, пока PlayerA, PlayerB, Player C завершат sh).

Однако, если это невозможно, я бы хотел, чтобы все вызовы были последовательными. (Я думаю, что они все еще должны быть асинхронными c, поэтому они не блокируют другие несвязанные части кода).

В настоящее время происходит то, что каждый RefreshProfileInfo выполняется параллельно, что приводит к тому, что каждый раз выполняется обращение к бэкенду (в данном примере 9 раз).

Вместо этого я хочу выполнить их последовательно, чтобы только первый вызов обращался к бэкенду, а последующие вызовы просто ударяли в кеш.

Какую структуру данных / подход я должен использовать? У меня проблемы с выяснением, как «соединить» отдельные звонки друг с другом. Я играл с Task.WhenAll (), а также с SemaphoreSlim, но не могу понять, как их правильно использовать.

Неудачная попытка

Идея, лежащая в основе моей неудачной попытки, заключалась в том, чтобы создать вспомогательный класс, в котором я мог бы вызвать функцию SequentialRequest (Task), и она последовательно выполняла бы все задачи, вызываемые таким образом.

List<Task> waitingTasks = new List<Task>();
object _lock = new object();

public async Task SequentialRequest(Task func)
{
    var waitingTasksCopy = new List<Task>();

    lock (_lock)
    {
        waitingTasksCopy = new List<Task>(waitingTasks);
        waitingTasks.Add(func); // Add this task to the waitingTasks (for future SequentialRequests)
    }

    // Wait for everything before this to finish
    if (waitingTasksCopy.Count > 0)
    {
        await Task.WhenAll(waitingTasksCopy);
    }

    // Run this task
    await func;
}

Я думал, что это сработает, но "fun c" либо запускается мгновенно (вместо ожидания более ранних задач до конца sh), либо никогда не запускается вообще, в зависимости от того, как я назови это.

Если я вызываю его с помощью этого, он запускается мгновенно:

async Task testTask()
{
    await Task.Delay(4000);
}

Если я вызываю его с помощью этого, он никогда не запускается:

Task testTask = new Task(async () =>
{
    await Task.Delay(4000);
});

Ответы [ 2 ]

4 голосов
/ 22 января 2020

Вот почему ваша текущая попытка не работает:

// Run this task
await func;

В приведенном выше комментарии не описывается, что делает код. В асинхронном мире Task представляет некоторую операцию , которая уже выполняется . Задачи не «запускаются» с помощью await; await это способ для текущего кода "асинхронно ждать" задачи до завершения . Так что никакая сигнатура функции, принимающая Task, не будет работать; задача уже выполняется еще до того, как она будет передана этой функции.

На самом деле ваш вопрос касается кеширования асинхронных операций. Один из способов сделать это - кэшировать саму Task<T>. В настоящее время ваш кеш содержит результаты (T); Вы можете изменить свой кэш для хранения асинхронных операций, которые извлекают эти результаты (Task<T>). Например, если ваш текущий тип кэша ConcurrentDictionary<PlayerId, Player>, вы можете изменить его на ConcurrentDictionary<PlayerId, Task<Player>>.

При наличии кэша задач, когда ваш код проверяет запись в кэше, он найдет существующую запись, если данные игрока загружены или начали загрузку . Поскольку Task<T> представляет некоторую асинхронную операцию, которая уже выполняется (или уже завершена).

Несколько замечаний для этого подхода:

  1. Это работает только для кеши памяти.
  2. Подумайте, как вы хотите обрабатывать ошибки. Наивный кэш Task<T> также будет кешировать ошибку результатов, что обычно нежелательно.

Второй пункт выше - более сложная часть. Когда происходит ошибка, вы, вероятно, захотите добавить в лог c до удаление ошибочной задачи из кэша. Бонусные баллы (и дополнительная сложность), если код обработки ошибок в первую очередь предотвращает попадание ошибочной задачи в кэш.

, по крайней мере, я бы хотел, чтобы все вызовы были последовательными

Ну, это намного проще. SemaphoreSlim является асинхронной заменой lock, поэтому вы можете использовать общий SemaphoreSlim. Вызовите await mySemaphoreSlim.WaitAsync(); в начале RefreshProfileInfo, поместите тело в try, а в блоке finally в конце RefreshProfileInfo вызовите mySemaphoreSlim.Release();. Это ограничит выполнение всех вызовов RefreshProfileInfo последовательно.

0 голосов
/ 22 января 2020

У меня была такая же проблема в одном из моих проектов. У меня было несколько потоков, вызывающих один метод, и все они делали вызовы ввода-вывода, когда не были найдены в кеше. Что вы хотите сделать, это добавить Task в кеш, а затем await. Последующие вызовы будут просто читать результат после завершения задачи.

Пример:

private Task RefreshProfile(Player player)
{
     // cache is of type IMemoryCache
     return _cache.GetOrCreate(player, entry => 
     {
        // expire in 30 seconds
        entry.AbsoluteExpiration = DateTimeOffset.UtcNow.AddSeconds(30);
        return ActualRefreshCodeThatReturnsTask(player);
     });
}

Тогда просто await в вашем коде вызова

await Task.WhenAll(RefreshProfile(Player a), RefreshProfile(Player b), RefreshProfile(Player c));
...