Как разрешить списки IAsyncEnumerables, как только элемент готов - PullRequest
1 голос
/ 17 февраля 2020
public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
    {
        List<List<string>> splitIdsList = ids.Split(5);

        var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

        foreach (var entities in entityList)
        {
            await foreach (var entity in entities)
            {
                yield return entity;
            }
        }
    }

private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
    {
        var result = await Connection.QueryAsync(query, new {ids})

        foreach (var entity in result)
        {
            yield return entity;
        }
    }

Если я отправлю 25 идентификаторов для этой функции. Первый FindByIdsQuery занимает 5000 мс. Другие 4 FindByIdsQuery занимает 100 мс. Тогда это решение не будет выводить какие-либо сущности до тех пор, пока через 5000мс. Есть ли решение, которое начнет выводить сущности, как только найдется кто-нибудь для вывода. Или, если вы могли бы сделать что-то похожее на Task, с помощью Task.WhenAny.

Для ясности: любой из 5 запросов может занять 5000 мс.

Ответы [ 2 ]

2 голосов
/ 17 февраля 2020

Из ваших комментариев я понял вашу проблему. В основном вы ищете оператор "SelectMany". Этот оператор начал бы ожидать всех IAsyncEnumerables и возвращать элементы в порядке их поступления, независимо от того, в каком порядке перечислены исходные асинхронные c.

Я надеялся, что значение по умолчанию AsyncEnumerable.SelectMany делает это, но я обнаружил, что это не так. Он проходит через перечисляемые источники, а затем проходит через все внутреннее перечисляемое, прежде чем перейти к следующему. Так что я взломал вместе SelectMany вариант, который должным образом ожидает всех внутренних перечисляемых asyn c одновременно. Будьте осторожны, я не гарантирую ни правильности, ни безопасности. Обработка ошибок равна нулю.

/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
    // get enumerators from all inner IAsyncEnumerable
    var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();

    List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();

    // start all inner IAsyncEnumerable
    foreach (var asyncEnumerator in enumerators)
    {
        runningTasks.Add(MoveNextWrapped(asyncEnumerator));
    }

    // while there are any running tasks
    while (runningTasks.Any())
    {
        // get next finished task and remove it from list
        var finishedTask = await Task.WhenAny(runningTasks);
        runningTasks.Remove(finishedTask);

        // get result from finished IAsyncEnumerable
        var result = await finishedTask;
        var asyncEnumerator = result.Item1;
        var hasItem = result.Item2;

        // if IAsyncEnumerable has item, return it and put it back as running for next item
        if (hasItem)
        {
            yield return asyncEnumerator.Current;

            runningTasks.Add(MoveNextWrapped(asyncEnumerator));
        }
    }

    // don't forget to dispose, should be in finally
    foreach (var asyncEnumerator in enumerators)
    {
        await asyncEnumerator.DisposeAsync();
    }
}

/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
    var res = await asyncEnumerator.MoveNextAsync();
    return (asyncEnumerator, res);
}

Затем вы можете использовать ее для объединения всех перечислимых элементов вместо первого foreach:

    var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));

    return entities;
1 голос
/ 17 февраля 2020

Проблема в том, что ваш код делает их способными. Здесь нет смысла в асин c foreach, потому что - вы не делаете асин c.

. Вы делаете это:

var entityList = splitIdsList.Select (x => FindByIdsQuery (x)). ToList ();

Это часть запроса, которая может выполнить asyn c, но это не так, потому что весь набор результатов материализуется в список , Затем вы go при асинхронном c зацикливаетесь на нем, но в этот момент все результаты уже находятся в памяти.

Способ получить асинхронный c состоит в простом избавлении от ToList. Сбросьте запрос в foreach, не материализуйте его в память. Asyn c foreach должен соответствовать запросу уровня ef (не результату запроса), чтобы вы могли обрабатывать информацию по мере ее поступления из базы данных. ToList эффективно обходит это.

Также следует понимать, что ef не может эффективно обрабатывать несколько обращений к идентификаторам. Единственный возможный способ сделать это - поместить их в массив и содержать, что является предложением SQL "IN" - ужасно неэффективно для больших чисел, так как оно вызывает сканирование таблицы. Эффективным способом SQL было бы загрузить их в табличную переменную со статистикой и использовать объединение, но в ef этого не существует - одно из ограничений. Ограничения sql длинных предложений IN хорошо документированы. Ограничений со стороны ef нет, но они все еще существуют.

...