чтение линейно-ориентированных данных с использованием асинхронных сокетов C # - PullRequest
1 голос
/ 20 июня 2011

Я новичок в C #, но много лет занимался программированием TCP и сокетов.Я хотел бы создать программу, которая выполняет асинхронный ввод-вывод (только прием) с нескольких серверов.Данные являются текстовыми и разделены символами новой строки (очень просто).

Мой вопрос о том, могу ли я предположить, что если я использую StreamReader в своей функции обратного вызова асинхронного чтения, могу ли я предположить, что

  1. Он сможет получить хотя бы полную строку данных, и
  2. Что он не будет каким-то образом блокировать.

Как я понимаю, я хочусделать StreamReader ReadLine в моем обратном вызове асинхронного чтения.Но что в BeginRead может знать , что я действительно хочу делать линейно-ориентированные данные?Я, очевидно, не знаю, какую длину читать в этот момент.

Надеюсь, это имеет смысл, и заранее спасибо!

Митч

Ответы [ 3 ]

1 голос
/ 20 июня 2011

Из вашего поста я думаю, что вам нужно создать свой собственный класс и обернуть в него сокет SYNCHRONOUS . Вы Read() сокет блокируете, буферизуете данные, разбиваете строки и запускаете событие для потребителя класса, когда доступна вся строка.

1 голос
/ 20 июня 2011

Исходя из моего понимания вашего вопроса: вы намереваетесь использовать StreamReader как блокирующий поток, но при этом вы используете асинхронные сокеты ... даже если мое понимание неверно, пожалуйста, смотрите предложения ниже, так какони также охватывают случай, когда вы действительно намереваетесь использовать StreamReader для асинхронного чтения.

  1. Он сможет получить хотя бы полную строку данных, а
    Да и нет: он будет считывать всю строку данных, пока в нижележащем буфере есть новая строка, однако это асинхронное событие, поэтому, как только у StreamReader заканчиваются данные (т.е. он достигает последней новой строки)ему больше нечего будет читать, даже если вы получите еще один асинхронный обратный вызов.По сути, вы привязываете свой приемный буфер к SocketAsyncEventArgs, и когда вы получаете обратный вызов, ваш буфер заполняется.Что вы будете делать с буфером после этого, зависит от вас (вы можете передать его в StreamReader для чтения строкового ввода).

  2. что он каким-то образом не будет блокироваться.
    Он не будет блокировать, но он не получит все данные, которые вы ищете, в зависимости от того, как вы реализовали свой асинхронный обратный вызов.Это подводит меня к следующему пункту ...

Мой следующий пункт: Я не знаю, является ли это непрерывным потоком данных или просто какой-то строкой«Пакет» данных с разделителями, однако, у вас есть несколько вариантов:

  1. Использовать 1 поток памяти и поток чтения / записи, связанный с этим потоком памяти.Всякий раз, когда вы получаете обратный вызов, вы записываете буфер, используя StreamWriter, и читаете строки, используя StreamReader, пока вы больше не можете читать новые строки.
  2. Продолжайте записывать буфер в поток, пока не получитебольше не будет байтов из сокета (т. е. вы получите 0 для полученных байтов), а затем используйте этот поток для инициализации StreamReader, а затем прочитайте все строки.Это предполагает, что у вас фиксированный размер входящих данных, а не непрерывный поток «ввода».
  3. Создайте блокирующее считыватель и дайте ему жить в отдельном потоке, он читает только тогда, когда асинхронное чтение вызываетфлаг какой-то (подумайте ManualResetEvent или что-то подобное).Это своего рода побеждает назначение асинхронных сокетов, но все же позволяет вам воспользоваться преимуществами.Это немного лучше, чем чистые блокирующие сокеты, потому что блокировка полностью контролируется вами.

Я бы предположил, что ваш лучший вариант - это # ​​1, но другие 2 могут быть исключены, в зависимости от вопросао том, что вы пытаетесь сделать.

С другой стороны: я создал асинхронный HTTP-клиент , так что не стесняйтесь взглянуть на него и очистить то, что вы считаете полезным,Я хотел бы предположить, что эти две функции будут наиболее важными для вас:

Начните получать:

private void BeginReceive()
{
    if ( _clientState == EClientState.Receiving)
    {
        if (_asyncTask.BytesReceived != 0 && _asyncTask.TotalBytesReceived <= _maxPageSize)
        {
            SocketAsyncEventArgs e = new SocketAsyncEventArgs();
            e.SetBuffer(_asyncTask.ReceiveBuffer, 0, _asyncTask.ReceiveBuffer.Length);
            e.Completed += new EventHandler<SocketAsyncEventArgs>(ReceiveCallback);
            e.UserToken = _asyncTask.Host;

            bool comletedAsync = false;
            try
            {
                comletedAsync = _socket.ReceiveAsync(e);
            }
            catch (SocketException se)
            {
                Console.WriteLine("Error receiving data from: " + _asyncTask.Host);
                Console.WriteLine("SocketException: {0} Error Code: {1}", se.Message, se.NativeErrorCode);

                ChangeState(EClientState.Failed);
            }

            if (!comletedAsync)
            {
                // The call completed synchronously so invoke the callback ourselves
                ReceiveCallback(this, e);
            }
        }
        else
        {
            //Console.WriteLine("Num bytes received: " + _asyncTask.TotalBytesReceived);
            ChangeState(EClientState.ReceiveDone);
        }
    }
}

И ReceiveCallback:

private void ReceiveCallback(object sender, SocketAsyncEventArgs args)
{
    lock (_sync) // re-entrant lock
    {
        // Fast fail: should not be receiving data if the client
        // is not in a receiving state.
        if (_clientState == EClientState.Receiving)
        {
            String host = (String)args.UserToken;

            if (_asyncTask.Host == host && args.SocketError == SocketError.Success)
            {
                try
                {
                    Encoding encoding = Encoding.ASCII;
                    _asyncTask.BytesReceived = args.BytesTransferred;
                    _asyncTask.TotalBytesReceived += _asyncTask.BytesReceived;

                    // ********************************************
                    // You will need to replace the following line:
                    _asyncTask.DocSource += encoding.GetString(_asyncTask.ReceiveBuffer, 0, _asyncTask.BytesReceived);

                    // Write the contents of the ReceiveBuffer to a Stream using a StreamWriter

                    // Use the StreamReader associated with the same Stream to read the next line(s)
                    // ********************************************
                    BeginReceive();
                }
                catch (SocketException e)
                {
                    Console.WriteLine("Error receiving data from: " + host);
                    Console.WriteLine("SocketException: {0} Error Code: {1}", e.Message, e.NativeErrorCode);

                    ChangeState(EClientState.Failed);
                }
            }
            else if (_asyncTask.Host != host)
            {
                Console.WriteLine("Warning: received a callback for {0}, but the client is currently working on {1}.",
                    host, _asyncTask.Host);
            }
            else
            {
                Console.WriteLine("Socket Error: {0} when receiving from {1}",
                   args.SocketError,
                   _asyncTask.Host);
                ChangeState(EClientState.Failed);
            }
        }
    }
} 
0 голосов
/ 20 июня 2011

Насколько я знаю, вы не сможете сделать это с помощью встроенных классов.

Вы можете использовать StreamReader на NetworkStream для чтения целых строк, но вам нужно будет самостоятельно кодировать асинхронную часть, так как StreamReader не предоставляет асинхронный интерфейс. В качестве альтернативы вы можете использовать NetworkStream.BeginRead, который является асинхронным, но тогда вам придется написать код, чтобы он распознавал ввод на основе строки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...