Реализация асинхронной операции «чтение всех доступных в данный момент данных из потока» - PullRequest
7 голосов
/ 22 декабря 2010

Я недавно дал ответ на этот вопрос: C # - перенаправление вывода консоли реального времени .

Как часто случается, объясняя вещи (здесь «вещи» было, как я решил подобную проблему)приводит вас к большему пониманию и / или, как в данном случае, «упс» моментам.Я понял, что в моем решении, как реализовано, есть ошибка.Ошибка имеет мало практического значения, но она имеет чрезвычайно большое значение для меня как для разработчика: я не могу успокоиться, зная, что мой код может взорваться.

Устранение ошибки - цельэтот вопрос.Я прошу прощения за длинное вступление, так что давайте запачкаемся.

Я хотел создать класс, который позволит мне получать входные данные от стандартного вывода консоли Stream.Консольные выходные потоки имеют тип FileStream;реализация может привести к этому, если это необходимо.Существует также связанный StreamReader, уже присутствующий для использования.

Существует только одна вещь, которую мне нужно реализовать в этом классе для достижения желаемой функциональности: асинхронная операция «прочитать все данные, доступные в данный момент».Чтение до конца потока не является жизнеспособным, потому что поток не завершится, пока процесс не закроет дескриптор вывода консоли, и он не сделает этого, потому что он является интерактивным и ожидает ввода перед продолжением.

Я будуиспользуя эту гипотетическую асинхронную операцию для реализации уведомлений на основе событий, что будет более удобным для моих абонентов.

Открытый интерфейс класса такой:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

StartSendingEvents и StopSendingEvents делать то, что они рекламируют;для целей этого обсуждения мы можем предположить, что события всегда отправляются без потери общности.

Класс использует эти два поля внутренне:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

Функциональность классареализовано в методах ниже.Чтобы получить шарик:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

Чтобы прочитать данные из Stream без блокировки, а также без использования символа возврата каретки, BeginRead называется:

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

Сложная часть:

BeginRead требует использования буфера.Это означает, что при чтении из потока возможно, что байты, доступные для чтения («входящий блок»), больше, чем буфер.Помните, что цель здесь - - прочитать все фрагменты и вызвать абонентов событий ровно один раз для каждого фрагмента .

. С этой целью, если буфер заполнен послеEndRead, мы не отправляем его содержимое подписчикам сразу, а вместо этого добавляем его к StringBuilder.Содержимое StringBuilder отправляется обратно только тогда, когда больше нет информации для чтения из потока.

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

После любого чтения, которого недостаточно для заполнения буфера (в этом случае мы знаем, чтобольше не было данных для чтения во время последней операции чтения), все накопленные данные отправляются подписчикам:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(я знаю, что, пока нет подписчиков, данные накапливаются навсегда.является осознанным решением).

Хорошо

Эта схема работает почти отлично:

  • Асинхронная функциональность безпорождение любых потоков
  • Очень удобно для вызывающего кода (просто подписаться на событие)
  • Никогда не более одного события на каждый раз, когда доступны данные для чтения
  • Почтине зависит от размера буфера

плохой

последний почти очень большой.Рассмотрим, что происходит, когда есть входящий кусок с длиной, точно равной размеру буфера.Чанк будет прочитан и помещен в буфер, но событие не будет запущено.За этим последует BeginRead, который ожидает найти больше данных, принадлежащих текущему чанку, чтобы отправить их обратно целиком, но ... в потоке больше не будет данных.

Фактически, пока данные помещаются в поток порциями с длиной, точно равной размеру буфера, данные будут буферизироваться, и событие никогда не будет инициировано.

Этот сценарий может быть маловероятным на практике, тем более что мы можем выбрать любое число для размера буфера, но проблема есть.

Решение

К сожалению, после проверки доступных методов на FileStream и StreamReader я не могу найти ничего, что позволило бы мне заглянуть в поток, а также разрешить использование асинхронных методов на нем.

Одним из "решений" было бы ожидание потока на ManualResetEvent после обнаружения условия "заполнение буфера". Если событие не сообщается (посредством асинхронного обратного вызова) в течение небольшого промежутка времени, больше данных из потока не будет поступать, и данные, накопленные до сих пор, должны быть отправлены подписчикам. Однако это создает необходимость в другом потоке, требует синхронизации потоков и просто неэлегатно.

Также достаточно указать тайм-аут для BeginRead (время от времени перезванивайте в мой код, чтобы я мог проверить, есть ли данные для отправки; в большинстве случаев делать нечего, поэтому я ожидаю хит производительности должен быть незначительным). Но похоже, что таймауты не поддерживаются в FileStream.

Так как я представляю, что асинхронные вызовы с тайм-аутами являются опцией в чистом Win32, другой подход мог бы заключаться в том, чтобы PInvoke выпал из проблемы. Но это также нежелательно, так как это создаст сложность и просто вызовет боль в коде.

Есть ли элегантный способ обойти проблему?

Спасибо, что проявили терпение, чтобы прочитать все это.

Обновление:

Я определенно плохо описал сценарий в моей первоначальной записи. С тех пор я немного пересмотрел рецензию, но для большей уверенности:

Вопрос заключается в том, как реализовать асинхронную операцию «чтение всех данных, доступных в данный момент».

Мои извинения людям, которые нашли время, чтобы прочитать и ответить без меня, чтобы мои намерения были достаточно ясны.

Ответы [ 3 ]

2 голосов
/ 22 декабря 2010

Теоретически я согласен с Джейсоном; Ваша реализация имеет большие проблемы, чем наличие логической дыры в случае, когда кусок данных равномерно делится вашим буфером. Самая большая проблема, которую я вижу, состоит в том, что ваш читатель должен иметь достаточно знаний о типе файла, чтобы знать, как он может разделить данные на «куски», с которыми ваши подписчики знают, как обращаться.

Потоки не имеют внутренних знаний о том, что они получают или отправляют; только механизм, с помощью которого они транспортируют данные. NetworkStream может отправлять HTML или ZIP-файл; FileStream может читать текстовый файл или MP3. Это средство чтения (XmlReader, TextReader, Image.FromStream () и т. Д.) Обладает этими знаниями. Следовательно, ваш асинхронный читатель должен знать хотя бы что-то о данных, но было бы полезно не иметь эти знания в жестком коде.

Для работы с «потоковыми» данными инкрементные отправки должны быть индивидуально полезны; вы должны знать достаточно о том, что вы узнаете, что то, что вы получили, - это «кусок», который может обрабатываться индивидуально. Мое предложение состоит в том, чтобы предоставить эту информацию вашему асинхронному считывателю инкапсулированным способом, либо попросив подписчиков сообщить вам, либо предоставив некоторый специфичный для формата «чанкатор» отдельно от слушателей (поскольку этот читатель прослушивает вывод консоли и все слушатели должны относиться к этому так же, этот второй план может быть лучше).

Логическая реализация:

public class MyStreamManager {
    public delegate bool ValidChunkTester(StringBuilder builder);

    private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
    public event ValidChunkTester IsValidChunk
    { add{validators.Add(value);} remove {validators.Remove(value);}}

    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;


    public void StartSendingEvents();
    public void StopSendingEvents();
}

...

private void ReadHappened(IAsyncResult asyncResult)
{
    var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
    if (bytesRead == 0) {
        this.OnAutomationStopped();
        return;
    }

    var input = this.StandardOutput.CurrentEncoding.GetString(
        this.buffer, 0, bytesRead);
    this.inputAccumulator.Append(input);

    if (validators.Any() && StandardOutputRead !-= null 
            && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
        this.OnInputRead(); // send when all listeners can work with the buffer contents
    }

    this.BeginReadAsync(); // continue "looping" with BeginRead
}

...

Эта модель требует, чтобы подписчики не модифицировали StringBuilder; вы можете предоставить им что-то неизменное, чтобы вы могли их изучить. Пример слушателя может быть:

public bool IsACompleteLine(StringBuilder builder)
{
    return builder.Contains(Environment.NewLine);
}

или

public bool Contains256Bytes(StringBuilder builder)
{
    return builder.Length >= 256;
}

... вы поняли идею. Событие, определяющее ценность текущего буфера, который должен быть освобожден для слушателей, концептуально отделено от самих слушателей, но не обязательно должно быть таким конкретным, поэтому оно будет поддерживать либо один выходной тест, либо несколько тестов, основанных на слушателе.

1 голос
/ 22 декабря 2010

Если вы читаете из FileStream описанным способом, тогда будет прочитано все содержимое базового файла.Таким образом, у вас будет только один «кусок» данных, который вы будете считывать в StringBuilder (несколько неэффективно) небольшими кусочками.Ничто в вашей реализации не дает никакого способа разбить данные на более мелкие «куски», потому что чтение будет продолжать заполнять ваш буфер, пока файл не будет исчерпан.На этом уровне абстракции только клиент знает, каким должен быть размер этих чанков, поэтому вам придется передать им данные, которые будут декодированы в чанки.Что противоречит первоначальной цели вашего буфера.

Если у вас есть какой-то другой вид потока, который доставляет данные в пакетах (например, вывод на консоль или пакеты comms), то вы получите пакеты данных, но вы все равно можетене гарантирует, что чтение, заканчивающееся менее чем буфером данных, означает, что вы достигли конца пакета, просто что в передаче есть пауза.Обычно в этих случаях вам нужно буферизовать данные и обработать их (то есть обладатель информации о формате данных), чтобы определить, когда был получен полный кусок / пакет.В этом сценарии у вас всегда будет «незавершенный чанк», ожидающий в вашем буфере, пока не будут получены дополнительные данные, чтобы завершить чанк или начать новый чанк, и «вытолкнуть его» из вашего буфера.Это может быть проблемой при обмене данными, когда следующий пакет может не прибыть в течение длительного времени.

Таким образом, в конечном счете, вам нужно будет проинформировать читателя о том, как данные должны быть разделены на куски, что означает, что вынужен клиент для декодирования, поэтому классы базового потока не доставляют данные так, как вы пытаетесь реализовать.

Итак, добавив этот промежуточный класс, что вы получите?В лучшем случае это добавит дополнительный уровень сложности и накладных расходов к вашему вводу / выводу (давайте посмотрим правде в глаза, то, что вы пытаетесь абстрагировать из своего клиентского кода, это всего лишь несколько строк кода).В худшем случае он не сможет разбить данные на части по мере необходимости, поэтому он вообще не будет полезен.

Осторожно "This scenario may be highly unlikely to occur in practice": при потоковой передаче больших объемов данных вы можете быть уверены, что даже "маловероятные "события будут происходить со значительной регулярностью - конечно, часто из-за того, что вы не можете предположить, что они никогда не произойдут.

[править - добавлено]

Если выне пытаясь обобщить ваше решение, вы можете добавить логику в класс, который легко решит проблему.

Два возможных решения:

  • Если вы знаете максимальный пределиз строк консоли, которые будут выводиться вам, вы можете просто использовать достаточно большой буфер, чтобы гарантировать, что ваш пограничный случай никогда не произойдет.(Например, команды CreateProcess ограничены 32 КБ, cmd.exe ограничивает команды 8 КБ. Вы можете найти схожие ограничения в отношении полученных «кусков» данных)

  • Если ваши куски всегдастроки (завершенные блоки текста новой строки), затем просто проверьте, выглядит ли последний символ в вашем буфере как терминатор (0x0a или 0x0d).Если нет, то есть еще данные для чтения.

0 голосов
/ 22 декабря 2010

Я был бы склонен удалить «двойную буферизацию» (часть, где вы заполняете StringBuilder, а затем пропускаете данные, когда он заполнен) и возвращаете данные, полученные из буфера потока, всякий раз, когда считываются байты. Так что в ReadHappened у вас будет:

if (bytesRead > 0) {
    this.OnInputRead(); // only send back if we 're sure we got it all
}

Как уже говорили другие, подписчик должен будет что-то знать о сообщении / порции данных и о том, как объединить несколько частей в одно целое. Поэтому вы также можете вернуть каждую часть по мере ее получения. Если подписчик является «тупым подписчиком», который просто действует как проводник, это тоже будет работать.

...