Что такое хороший метод для обработки линейных сетевых потоков ввода-вывода? - PullRequest
6 голосов
/ 08 февраля 2009

Примечание: позвольте мне извиниться за длину этого вопроса, я должен был поместить в него много информации. Я надеюсь, что это не заставит слишком многих людей просто просматривать и делать предположения. Пожалуйста, прочитайте полностью. Спасибо.

У меня есть поток данных, поступающий через сокет. Эти данные ориентированы на строки.

Я использую APM (метод асинхронного программирования) .NET (BeginRead и т. Д.). Это исключает использование потокового ввода-вывода, потому что асинхронный ввод-вывод основан на буфере. Можно упаковать данные и отправить их в поток, такой как поток памяти, но есть и проблемы там.

Проблема в том, что мой входной поток (который я не контролирую) не дает мне никакой информации о том, как долго этот поток. Это просто поток строк новой строки, выглядящих так:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

Итак, используя APM, и поскольку я не знаю, как долго будет длиться какой-либо конкретный набор данных, вполне вероятно, что блоки данных будут пересекать границы буфера, требующие многократного чтения, но эти многократные чтения также будут охватывать несколько блоков данных .

Пример:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

Моей первой мыслью было использование StringBuilder и просто добавление строк буфера в SB. Это работает в некоторой степени, но мне было трудно извлечь блоки данных. Я пытался использовать StringReader для чтения новых строк, но не было никакого способа узнать, была ли получена полная строка или нет, поскольку StringReader возвращает частичную строку в конце последнего добавленного блока, после чего возвращается ноль после этого. Невозможно узнать, была ли возвращенная строка полностью новой строкой данных.

Пример:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

Что еще хуже, так это то, что если я просто продолжаю добавлять данные, буферы становятся все больше и больше, и, поскольку они могут работать неделями или месяцами, это не очень хорошее решение.

Моей следующей мыслью было удаление блоков данных из СБ, когда я их читал. Это потребовало написания моей собственной функции ReadLine, но затем я застрял, блокируя данные во время чтения и записи. Кроме того, большие блоки данных (которые могут состоять из сотен операций чтения и мегабайтов данных) требуют сканирования всего буфера в поисках новых строк. Это не эффективно и довольно уродливо.

Я ищу что-то, что имеет простоту StreamReader / Writer с удобством асинхронного ввода-вывода.

Моей следующей мыслью было использование MemoryStream и запись блоков данных в поток памяти, затем присоединение StreamReader к потоку и использование ReadLine, но опять же у меня возникают проблемы с определением, является ли последнее чтение в буфере полная линия или нет, плюс еще труднее удалить «устаревшие» данные из потока.

Я также думал об использовании потока с синхронным чтением. Преимущество в том, что при использовании StreamReader он всегда будет возвращать полную строку из ReadLine (), за исключением случаев разрыва соединения. Однако это имеет проблемы с отменой соединения, и определенные виды сетевых проблем могут привести к зависанию блокирующих сокетов в течение продолжительного периода времени. Я использую асинхронный ввод-вывод, потому что я не хочу связывать поток для жизни программы, блокирующей получение данных.

Соединение длительное. И данные будут течь со временем. Во время первоначального соединения существует большой поток данных, и как только этот поток завершен, сокет остается открытым в ожидании обновлений в реальном времени. Я не знаю точно, когда начальный поток «закончил», поскольку единственный способ узнать, что больше никаких данных не отправляется сразу. Это означает, что я не могу дождаться окончания начальной загрузки данных перед обработкой, я в значительной степени застрял в обработке «в режиме реального времени», когда она поступает.

Итак, кто-нибудь может предложить хороший способ справиться с этой ситуацией таким образом, чтобы он не был слишком сложным? Я действительно хочу, чтобы это было максимально просто и элегантно, но я продолжаю придумывать все более и более сложные решения из-за всех крайних случаев. Я предполагаю, что мне нужен какой-то FIFO, в котором я могу легко добавлять новые данные, в то же время извлекая из них данные, которые соответствуют определенным критериям (т. Е. Строки с завершающими символами новой строки).

Ответы [ 2 ]

5 голосов
/ 08 февраля 2009

Это довольно интересный вопрос. В прошлом я решил использовать отдельный поток с синхронными операциями, как вы предлагаете. (Мне удалось обойти большинство проблем с блокировкой сокетов с помощью блокировок и большого количества обработчиков исключений.) Тем не менее, использование встроенных асинхронных операций, как правило, целесообразно, поскольку допускает настоящий асинхронный ввод-вывод на уровне ОС, поэтому я понимаю твоя точка.

Ну, я пошел и написал класс для выполнения того, что, как я считаю, вам нужно (относительно чистым способом, я бы сказал). Дайте мне знать, что вы думаете.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

Экземпляр этого класса должен быть создан для каждого NetworkStream, и функция Process должна вызываться при каждом получении новых данных (в методе обратного вызова для BeginRead перед вызовом следующего BeginRead, который я себе представляю).

Примечание. Я проверял этот код только с тестовыми данными, а не с фактическими данными, передаваемыми по сети. Однако я не ожидал бы никаких отличий ...

Кроме того, предупреждение о том, что класс, конечно, не является потокобезопасным, но до тех пор, пока BeginRead не будет выполнен снова до тех пор, пока не будут обработаны текущие данные (как я полагаю, вы делаете), не должно быть любые проблемы.

Надеюсь, это работает для вас. Дайте мне знать, если остались проблемы, и я постараюсь изменить решение для их устранения. (В этом вопросе, который я пропустил, может быть какая-то тонкость, несмотря на то, что я внимательно его прочитал!)

0 голосов
/ 08 февраля 2009

То, что вы объясняете в своем вопросе, очень напоминает мне строки ASCIZ. ( текст ссылки ). Это может быть полезным началом.

Мне нужно было написать нечто подобное в колледже для проекта, над которым я работал. К сожалению, у меня был контроль над отправляющим сокетом, поэтому я добавил длину поля сообщения как часть протокола. Тем не менее, я думаю, что подобный подход может принести вам пользу.

Как я подошел к своему решению, я отправил что-то вроде 5HELLO, поэтому сначала я увидел 5 и узнал, что у меня длина сообщения 5, а для этого мне нужно было 5 символов. Однако, если при моем асинхронном чтении я получил только 5HE, я увидел бы, что у меня длина сообщения 5, но я смог прочитать только 3 байта без проводов (предположим, символы ASCII). Из-за этого я знал, что мне не хватает нескольких байтов, и хранил то, что у меня было, во фрагментном буфере. У меня был один буфер фрагментов на сокет, чтобы избежать проблем с синхронизацией. Грубый процесс.

  1. Чтение из сокета в байтовый массив, запись количества прочитанных байтов
  2. Сканирование по байтам, пока не будет найден символ новой строки (это становится очень сложным, если вы не получаете символы ascii, но символы, которые могут быть несколькими байтами, вы сами для этого)
  3. Превратите ваш буфер фрагмента в строку и добавьте свой буфер чтения до новой строки. Удалите эту строку как завершенное сообщение в очередь или ее собственный делегат для обработки. (вы можете оптимизировать эти буферы, фактически записав сокет для чтения в тот же байтовый массив, что и фрагмент, но это сложнее объяснить)
  4. Продолжайте цикл, каждый раз, когда мы находим новую строку, создаем строку из байтовой последовательности из записанной начальной / конечной позиции и помещаем в очередь / делегат для обработки.
  5. Как только мы дойдем до конца нашего буфера чтения, скопируйте все, что осталось в буфер фрагмента.
  6. Вызовите BeginRead для сокета, который перейдет к шагу 1., когда данные будут доступны в сокете.

Затем вы используете другой поток для чтения вашей очереди несогласованных сообщений или просто позволяете потоку потоков обрабатывать его с помощью делегатов. И делать любую обработку данных, которую вы должны сделать. Кто-то исправит меня, если я ошибаюсь, но с этим очень мало проблем с синхронизацией потоков, так как вы можете только читать или ждать чтения из сокета в любое время, так что не беспокойтесь о блокировках (кроме случаев, когда вы заполняя очередь, я использовал делегатов в моей реализации). Есть несколько деталей, которые вам нужно будет проработать самостоятельно, например, какой размер буфера фрагмента оставить, если при чтении вы получаете 0 новых строк, все сообщение должно быть добавлено в буфер фрагмента без перезаписи. что-нибудь. Я думаю, что в итоге мне потребовалось около 700 - 800 строк кода, но это включало настройку соединения, согласование для шифрования и некоторые другие вещи.

Эта настройка очень хорошо для меня; Я смог выполнить до 80 Мбит / с по локальной сети 100 Мбит / с, используя эту опцию 1,8 ГГц, включая обработку шифрования. А поскольку вы привязаны к сокету, сервер будет масштабироваться, поскольку одновременно можно работать с несколькими сокетами. Если вам нужно, чтобы элементы обрабатывались по порядку, вам нужно использовать очередь, но если порядок не имеет значения, то делегаты обеспечат вам очень масштабируемую производительность из пула потоков.

Надеюсь, это поможет, не для того, чтобы быть полным решением, а для того, чтобы начать искать.

* Просто обратите внимание, моя реализация не работала на байтовом уровне и поддерживала шифрование, я использовал символы в своем примере, чтобы упростить визуализацию.

...