Что происходит с возвратом IEnumerable, если используется с async / await (потоковая передача данных с SQL Server с помощью Dapper)? - PullRequest
7 голосов
/ 05 апреля 2019

Я использую Dapper для потоковой передачи данных из очень большого набора в SQL Server. Он прекрасно работает с возвратом IEnumerable и вызовом Query(), но когда я переключаюсь на QueryAsync(), кажется, что программа пытается прочитать все данные с SQL Server вместо потоковой передачи.

Согласно этому вопросу , он должен нормально работать с buffered: false, что я делаю, но вопрос ничего не говорит о async/await.

Теперь, согласно этому вопросу , нелегко делать то, что я хочу, с QueryAsync().

Правильно ли я понимаю, что перечислимые элементы повторяются, когда контекст переключается на async/await?

Другой вопрос, возможно ли это сделать, когда будет доступна новая асинхронная потоковая передача C # 8?

Ответы [ 3 ]

8 голосов
/ 05 апреля 2019

Если вы проверите исходный код , вы увидите, что ваши подозрения почти верны. Когда buffered ложно, QueryAsync будет передавать синхронно .

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

Как поясняется в комментарии, невозможно использовать ReadAsync, когда ожидается, что тип возвращаемого значения будет IEnumerable. Вот почему должны были быть введены асинхронные перечислители C # 8.

Код для ExecuteReaderSync:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Используется Read вместо ReadAsync.

C # 8 асинхронные потоки позволят переписать это, чтобы вернуть IAsyncEnumerable. Простое изменение языковой версии не решит проблему.

С учетом текущей документации по асинхронным потокам это может выглядеть следующим образом:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut асинхронные потоки - это одна из вещей, которая может работать только на .NET Core и, вероятно, еще не реализована. Когда я пытался написать один в Sharplab.io, Kaboom. [connection lost, reconnecting…]

7 голосов
/ 05 апреля 2019

В контексте dapper , в частности, , да: ему нужен другой API, как объясняет отличный ответ @Panagiotis. Далее следует не ответ как таковой, но дополнительный контекст, который разработчики, сталкивающиеся с теми же проблемами, возможно, пожелают рассмотреть.

У меня еще не было "шипов" для багровых (хотя у меня есть для SE.Redis), и я разрываюсь между различными вариантами:

  1. добавить новый API для .NET Core * только 1012 * , возвращая соответствующий асинхронно-перечислимый тип
  2. полностью разбивает существующий API как критическое изменение («основное» и т. Д.), Изменяя его так, чтобы оно возвращало асинхронно перечислимый тип

Возможно, мы пойдем с "1", но я должен сказать, что второй вариант необычайно заманчив, по уважительным причинам:

  • существующий API, вероятно, не делает того, чего ожидают от него
  • мы бы хотели, чтобы новый код начал его использовать

Но странная вещь - это .NET Core 3.0-ness IAsyncEnumerable<T> - так как очевидно, что Dapper не просто нацелен на .NET Core 3.0; мы могли бы:

  1. ограничить функцию до .NET Core 3.0 и вернуть IAsyncEnumerable<T>
  2. ограничить библиотеку до .NET Core 3.0 и вернуть IAsyncEnumerable<T>
  3. взять зависимость от System.Linq.Async (которая не является "официальной", но достаточно официальной для наших целей) для предыдущих платформ и вернуть IAsyncEnumerable<T>
  4. возвращает пользовательский перечислимый тип, который не на самом деле IAsyncEnumerable<T> (но который реализует IAsyncEnumerable<T> при наличии), и вручную реализует конечный автомат - характер утки foreach означает это будет работать нормально, если наш пользовательский перечислимый тип предоставляет правильные методы

Я думаю, что мы , вероятно, перейдем к варианту 3, но повторюсь: да, что-то должно измениться.

1 голос
/ 06 апреля 2019

( Предполагается, что это комментарий // недостаточно репутации, пока )

Марк Гравелл упоминает в своем ответе , что IAsyncEnumerable<T> будетпредпочтительнее, но из-за зависимости от NET Core 3.0 может быть лучше взять зависимость от System.Linq.Async (что можно считать «достаточно официальным») ...

В этом контексте https://github.com/Dasync/AsyncEnumerable пришло мне в голову (лицензия MIT): оно направлено на то, чтобы помочь

... (a) создать провайдера элемента, где создание элемента может занять много времени из-зак зависимости от других асинхронных событий (например, дескрипторов ожидания, сетевых потоков) и (b) потребителя, который обрабатывает эти элементы, как только они готовы, без блокировки потока (вместо этого обработка запланирована в рабочем потоке).

Еще один комментарий, RE: "Что происходит, когда выходит C # 8.0?"( FAQ )

C # 8.0 должен иметь функцию Async Streams.Когда версия языка будет наконец выпущена, это будет простой путь обновления для вашего приложения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...