C # Begin / EndReceive - как читать большие данные? - PullRequest
12 голосов
/ 24 февраля 2009

При чтении данных, скажем, 1024, как мне продолжить чтение из сокета, который получает сообщение размером более 1024 байтов, пока не останется данных? Должен ли я просто использовать BeginReceive для чтения только префикса длины пакета, а затем, как только он будет получен, использовать Receive () (в асинхронном потоке), чтобы прочитать остальную часть пакета? Или есть другой способ?

редактирование:

Я думал, что у ссылки Джона Скита есть решение, но в этом коде есть небольшая ошибка Код, который я использовал:

public class StateObject
{
    public Socket workSocket = null;
    public const int BUFFER_SIZE = 1024;
    public byte[] buffer = new byte[BUFFER_SIZE];
    public StringBuilder sb = new StringBuilder();
}

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);

    if (read > 0) 
    {
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (read == StateObject.BUFFER_SIZE)
        {
            s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                    new AyncCallback(Async_Send_Receive.Read_Callback), so);
            return;
        }
    }

    if (so.sb.Length > 0)
    {
        //All of the data has been read, so displays it to the console
        string strContent;
        strContent = so.sb.ToString();
        Console.WriteLine(String.Format("Read {0} byte from socket" + 
        "data = {1} ", strContent.Length, strContent));
    }
    s.Close();
}

Теперь это исправленное исправно работает большую часть времени, но не работает, когда размер пакета кратен буферу . Причина этого в том, что если буфер заполняется при чтении, предполагается, что данных больше; но проблема та же, что и раньше. Например, 2-байтовый буфер заполняется дважды в 4-байтовом пакете и предполагает, что данных больше. Затем он блокируется, потому что больше нечего читать. Проблема в том, что функция приема не знает, когда конец пакета.


Это заставило меня задуматься о двух возможных решениях: у меня мог быть либо разделитель конца пакета, либо я мог прочитать заголовок пакета, чтобы найти длину, а затем получить именно эту сумму (как я изначально предлагал).

Хотя есть проблемы с каждым из них. Мне не нравится идея использования разделителя, поскольку пользователь может каким-то образом преобразовать его в пакет во входной строке из приложения и испортить его. Это также кажется мне немного неаккуратным.

Заголовок длины звучит нормально, но я планирую использовать буфер протокола - я не знаю формат данных. Есть ли длина заголовка? Сколько это байтов? Будет ли это то, что я реализую сам? Etc ..

Что мне делать?

Ответы [ 7 ]

14 голосов
/ 24 февраля 2009

Нет - снова вызывать BeginReceive из обработчика обратного вызова, пока EndReceive не вернет 0. По сути, вы должны продолжать получать асинхронно, предполагая, что вы хотите максимально использовать асинхронный ввод-вывод.

Если вы посмотрите на страницу MSDN для Socket.BeginReceive, вы увидите пример этого. (Следует признать, что не так легко следовать, как могло бы быть.)

6 голосов
/ 01 марта 2009

Dang. Я не решаюсь даже ответить на это, учитывая сановников, которые уже взвешены, но здесь идет. Будьте нежны, о Великие!

Не имея возможности читать блог Марка (он заблокирован здесь из-за корпоративной интернет-политики), я собираюсь предложить «другой путь».

Уловка, на мой взгляд, заключается в , чтобы отделить получение данных от обработки этих данных .

Я использую класс StateObject, определенный следующим образом. Он отличается от реализации MSDN StateObject тем, что он не включает объект StringBuilder, константа BUFFER_SIZE является закрытой и для удобства включает конструктор.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

У меня также есть класс Packet, который является просто оберткой вокруг буфера и отметки времени.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

Моя функция ReceiveCallback () выглядит следующим образом.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

Обратите внимание, что эта реализация абсолютно не обрабатывает полученные данные и не имеет никаких ожиданий относительно того, сколько байтов предполагается получить. Он просто получает все данные, находящиеся в сокете (до 65535 байт), и сохраняет эти данные в списке пакетов, а затем немедленно ставит в очередь еще один асинхронный прием.

Поскольку обработка больше не происходит в потоке, который обрабатывает каждый асинхронный прием, данные, очевидно, будут обрабатываться другим потоком , поэтому операция Add () синхронизируется через оператор блокировки. Кроме того, поток обработки (будь то основной поток или какой-либо другой выделенный поток) должен знать , когда есть данные для обработки. Для этого я обычно использую ManualResetEvent, что я и показал выше.

Вот как работает обработка.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List<Packet> packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List<Packet>();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

Это базовая инфраструктура, которую я использую для всей моей сокетной связи. Это обеспечивает хорошее разделение между получением данных и обработкой этих данных.

Что касается другого возникшего у вас вопроса, важно помнить, что при таком подходе каждый экземпляр пакета не обязательно представляет полное сообщение в контексте вашего приложения. Экземпляр пакета может содержать частичное сообщение, одно сообщение или несколько сообщений, а ваши сообщения могут охватывать несколько экземпляров пакета. Я рассказал, как узнать, когда вы получили полное сообщение в соответствующем вопросе, который вы разместили здесь .

3 голосов
/ 24 февраля 2009

Сначала вы прочитали бы префикс длины. Получив это, вы просто продолжаете читать байты в блоках (и вы можете делать это асинхронно, как вы и предполагали) до тех пор, пока не исчерпаете количество байтов, которые, как вы знаете, поступают с провода.

Обратите внимание, что в какой-то момент при чтении последнего блока вам не захочется читать полные 1024 байта, в зависимости от того, какой префикс длины говорит общее количество, и сколько байтов вы прочитали.

1 голос
/ 08 июня 2011

Также я обеспокоен той же проблемой.

Когда я тестировал несколько раз, я обнаружил, что иногда несколько BeginReceive - EndReceive приводят к потере пакетов. (Этот цикл был окончен неправильно)

В моем случае я использовал два решения.

Сначала я определил достаточный размер пакета, чтобы сделать его только 1 раз BeginReceive() ~ EndReceive();

Во-вторых, когда я получаю большой размер данных, я использовал NetworkStream.Read() вместо BeginReceive() - EndReceive().

Асинхронный сокет не прост в использовании и требует большого понимания сокета.

1 голос
/ 08 ноября 2010

Вокруг этого, похоже, много путаницы. Примеры на сайте MSDN для асинхронной связи через сокет с использованием TCP вводят в заблуждение и не очень хорошо объяснены. Вызов EndReceive действительно блокируется, если размер сообщения точно кратен принимающему буферу. Это приведет к тому, что вы никогда не получите сообщение и приложение зависнет.

Просто чтобы прояснить ситуацию - вы ДОЛЖНЫ предоставить свой собственный разделитель для данных, если вы используете TCP. Прочитайте следующее (это из ОЧЕНЬ надежного источника).

Потребность в данных приложения Разграничение

Другое влияние лечения TCP входящие данные в виде потока, эти данные получено приложением с использованием TCP неструктурирован. Для передачи поток данных идет в TCP по одному устройство, и на приеме, поток данные возвращаются в приложение на приемное устройство. Хотя поток разбит на сегменты для передача по TCP, эти сегменты это детали уровня TCP, которые скрыты из приложения. Итак, когда устройство хочет отправить несколько штук данных, TCP не предоставляет механизма для указание, где находится «разделительная линия» находится между кусками, так как TCP не исследует значение данные на всех. Заявка должна предоставить средства для этого.

Рассмотрим, например, приложение это отправка записей базы данных. Это необходимо передать запись № 579 из Таблица базы данных сотрудников, затем запись № 581 и запись № 611. Отправляет эти записи в TCP, который лечит они все вместе как поток байт. TCP будет упаковывать эти байты на сегменты, но таким образом, приложение не может предсказать. это Возможно, что каждый в конечном итоге в другой сегмент, но более вероятно все они будут в одном сегменте, или часть каждого закончится в разных сегменты, в зависимости от их длины. Сами записи должны иметь некоторые вроде явных маркеров, поэтому приемное устройство может сказать, где один запись заканчивается и начинается следующая.

Источник: http://www.tcpipguide.com/free/t_TCPDataHandlingandProcessingStreamsSegmentsandSequ-3.htm

Большинство примеров использования EndReceive, которые я вижу в Интернете, неверны или вводят в заблуждение. Обычно это не вызывает проблем в примерах, потому что отправляется только одно предопределенное сообщение, а затем соединение закрывается.

0 голосов
/ 03 февраля 2019

Это очень старая тема, но я нашел здесь что-то еще и нашел:

Теперь это исправленное исправно работает большую часть времени, , но оно терпит неудачу, когда размер пакета кратен размеру буфера. Причина этого в том, что если буфер заполняется при чтении, предполагается, что там больше данных; но проблема та же, что и раньше. Например, 2-байтовый буфер заполняется дважды в 4-байтовом пакете и предполагает, что данных больше. Затем он блокируется, потому что больше нечего читать. Проблема в том, что функция приема не знает, когда конец пакета.

У меня была такая же проблема, и, поскольку ни один из ответов, похоже, не решил эту проблему, я использовал способ Socket.Available

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);    
    if (read > 0) 
    {
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (s.Available == 0)
        {
            // All data received, process it as you wish
        }
    }
    // Listen for more data
    s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                new AyncCallback(Async_Send_Receive.Read_Callback), so);
}

Надеюсь, что это помогает другим, ТАК помогало мне много раз, спасибо всем!

0 голосов
/ 24 февраля 2009

Для получения информации (общее начало / конец использования) вы можете просмотреть это сообщение в блоге ; этот подход работает нормально для меня и избавляет от боли ...

...