C #: Реализация NetworkStream.Peek? - PullRequest
       27

C #: Реализация NetworkStream.Peek?

16 голосов
/ 04 февраля 2010

В настоящее время в C # нет метода NetworkStream.Peek. Каков наилучший способ реализации такого метода, который функционирует так же, как NetworkStream.ReadByte, за исключением того, что возвращенный byte фактически не удаляется из Stream?

Ответы [ 4 ]

9 голосов
/ 02 сентября 2011

Я столкнулся с тем же «поиском по магическому номеру» и затем решил, к какому потоковому процессору отправить поток », и, к сожалению, не могу уклониться от этой проблемы - как это предлагается в комментариях к ответу Ааронаута - передав уже использованные байты в методы потоковой обработки в отдельных параметрах, так как эти методы являются данными и ожидают System.IO.Stream и ничего больше.

Я решил эту проблему, создав более или менее универсальный PeekableStream класс, который упаковывает поток. Он работает для NetworkStreams, но также и для любого другого потока, при условии, что вы Stream.CanRead it.


Редактировать

В качестве альтернативы, вы можете использовать новый ReadSeekableStream и сделать

var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count);
...
readSeekableStream.Read(..., count);
readSeekableStream.Seek(-count, SeekOrigin.Current);

В любом случае сюда приходит PeekableStream:

/// <summary>
/// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream,
/// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream,
/// but it won't affect the result of subsequent Read() calls.
/// 
/// This is sometimes necessary, e.g. for peeking at the magic number of a stream of bytes and decide which
/// stream processor to hand over the stream.
/// </summary>
public class PeekableStream : Stream
{
    private readonly Stream underlyingStream;
    private readonly byte[] lookAheadBuffer;

    private int lookAheadIndex;

    public PeekableStream(Stream underlyingStream, int maxPeekBytes)
    {
        this.underlyingStream = underlyingStream;
        lookAheadBuffer = new byte[maxPeekBytes];
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
            underlyingStream.Dispose();

        base.Dispose(disposing);
    }

    /// <summary>
    /// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read.
    /// 
    /// Calls to this method do not influence subsequent calls to Read() and Peek().
    /// 
    /// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read()
    /// method, which might read less than count bytes, even though the end of the stream has not been reached.
    /// </summary>
    /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and
    /// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source.</param>
    /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream.</param>
    /// <param name="count">The maximum number of bytes to be peeked from the current stream.</param>
    /// <returns>The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached.</returns>
    public virtual int Peek(byte[] buffer, int offset, int count)
    {
        if (count > lookAheadBuffer.Length)
            throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length);

        while (lookAheadIndex < count)
        {
            int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex);

            if (bytesRead == 0) // end of stream reached
                break;

            lookAheadIndex += bytesRead;
        }

        int peeked = Math.Min(count, lookAheadIndex);
        Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked);
        return peeked;
    }

    public override bool CanRead { get { return true; } }

    public override long Position
    {
        get
        {
            return underlyingStream.Position - lookAheadIndex;
        }
        set
        {
            underlyingStream.Position = value;
            lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException, 
                                // in which case we don't want to change the lookAhead status
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int bytesTakenFromLookAheadBuffer = 0;
        if (count > 0 && lookAheadIndex > 0)
        {
            bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex);
            Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer);
            count -= bytesTakenFromLookAheadBuffer;
            offset += bytesTakenFromLookAheadBuffer;
            lookAheadIndex -= bytesTakenFromLookAheadBuffer;
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                // copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx :
                // "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved
                // in a temporary location before destinationArray is overwritten."
                Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex);
        }

        return count > 0
            ? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count)
            : bytesTakenFromLookAheadBuffer;
    }

    public override int ReadByte()
    {
        if (lookAheadIndex > 0)
        {
            lookAheadIndex--;
            byte firstByte = lookAheadBuffer[0];
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex);
            return firstByte;
        }
        else
        {
            return underlyingStream.ReadByte();
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        long ret = underlyingStream.Seek(offset, origin);
        lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException,
                            // in which case we don't want to change the lookAhead status
        return ret;
    }

    // from here on, only simple delegations to underlyingStream

    public override bool CanSeek { get { return underlyingStream.CanSeek; } }
    public override bool CanWrite { get { return underlyingStream.CanWrite; } }
    public override bool CanTimeout { get { return underlyingStream.CanTimeout; } }
    public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } }
    public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } }
    public override void Flush() { underlyingStream.Flush(); }
    public override long Length { get { return underlyingStream.Length; } }
    public override void SetLength(long value) { underlyingStream.SetLength(value); }
    public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); }
    public override void WriteByte(byte value) { underlyingStream.WriteByte(value); }
}
6 голосов
/ 04 февраля 2010

Если вам фактически не нужно извлекать байт, вы можете обратиться к свойству DataAvailable.

В противном случае вы можете заключить его в StreamReader и вызовите его метод Peek.

Обратите внимание, что ни один из них не особенно надежен для чтения из сетевого потока из-за проблем с задержкой.Данные могут стать доступными (присутствующими в буфере чтения) в самое мгновение после взгляда.

Я не уверен, что вы собираетесь делать с этим, но *Метод 1016 * на NetworkStream является блокирующим вызовом, поэтому вам не нужно проверять состояние, даже если вы получаете порциями.Если вы пытаетесь обеспечить адаптацию приложения во время чтения из потока, вместо этого вы должны использовать поток или асинхронный вызов для получения данных.

Редактировать: Согласно этой записи , StreamReader.Peek глючит в NetworkStream или, по крайней мере, имеет недокументированное поведение, поэтому будьте осторожны, если вы решите пойти по этому пути.


Обновлено - ответ на комментарии

Понятие «взглянуть» на сам реальный поток фактически невозможно;это просто поток, и как только байт получен, он больше не находится в потоке.Некоторые потоки поддерживают поиск, так что вы можете технически перечитать этот байт, но NetworkStream не является одним из них.

Просмотр возможен только при чтении потока в буфер;как только данные находятся в буфере, просмотр легко, потому что вы просто проверяете все, что находится в текущей позиции в буфере.Вот почему StreamReader может это сделать;класс Stream обычно не имеет собственного метода Peek.

Теперь, специально для этой проблемы, я задаюсь вопросом, действительно ли это правильный ответ.Я понимаю идею динамического выбора метода для обработки потока, но действительно ли вам нужно сделать это в необработанном потоке?Не можете ли вы сначала прочитать поток в байтовый массив или даже скопировать его в MemoryStream и обработать его с этого момента?

Основная проблема, которую я вижу, заключается в том, что если что-то плохое случится, когда вы 'читая из сетевого потока, данные исчезли.Но если вы сначала прочитаете его во временное место, вы можете отладить его.Вы можете узнать, что это были за данные, и почему объект, который пытался обработать данные, потерпел неудачу на полпути.

В общем, самое первое, что вы хотите сделать с NetworkStream, это прочитать его влокальный буфер.Единственная причина, по которой я не могу этого сделать, это то, что вы читаете огромное количество данных - и даже тогда я мог бы рассмотреть возможность использования файловой системы в качестве промежуточного буфера, если она не помещается в памяти.

Я не знаю ваших точных требований, но из того, что я узнал до сих пор, мой совет будет таким: не пытайтесь обрабатывать ваши данные напрямую из NetworkStream, если нет веских причин для этого,Попробуйте сначала прочитать данные в память или на диск, а затем обработать копию.

4 голосов
/ 01 июля 2011

Если у вас есть доступ к объекту Socket, вы можете попробовать метод Receive , передав SocketFlags.Peek. Это аналог флага MSG_PEEK, который можно передать вызову recv в сокетах BSD или Winsock.

1 голос
/ 15 февраля 2015

Вот очень простая реализация PeekStream, которая позволяет вам просматривать определенное количество байтов только в начале потока (в отличие от возможности просмотра в любое время). Полученные байты возвращаются как Stream, чтобы минимизировать изменения в существующем коде.

Вот как вы его используете:

Stream nonSeekableStream = ...;
PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes
Stream initialBytesStream = peekStream.GetInitialBytesStream();
ParseHeaders(initialBytesStream);  // Work on initial bytes of nonSeekableStream
peekStream.Read(...) // Read normally, the read will start from the beginning

GetInitialBytesStream() возвращает искомый поток, который содержит до peekSize начальных байтов базового потока (меньше, если поток короче peekSize).

Из-за своей простоты чтение PeekStream должно быть только незначительно медленнее (если вообще), чем чтение основного потока напрямую.

public class PeekStream : Stream
{
    private Stream m_stream;
    private byte[] m_buffer;
    private int m_start;
    private int m_end;

    public PeekStream(Stream stream, int peekSize)
    {
        if (stream == null)
        {
            throw new ArgumentNullException("stream");
        }
        if (!stream.CanRead)
        {
            throw new ArgumentException("Stream is not readable.");
        }
        if (peekSize < 0)
        {
            throw new ArgumentOutOfRangeException("peekSize");
        }
        m_stream = stream;
        m_buffer = new byte[peekSize];
        m_end = stream.Read(m_buffer, 0, peekSize);
    }

    public override bool CanRead
    {
        get
        {
            return true;
        }
    }

    public override bool CanWrite
    {
        get
        {
            return false;
        }
    }

    public override bool CanSeek
    {
        get
        {
            return false;
        }
    }

    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }

    public override long Position
    {
        get
        {
            throw new NotSupportedException();
        }
        set
        {
            throw new NotSupportedException();
        }
    }

    public MemoryStream GetInitialBytesStream()
    {
        return new MemoryStream(m_buffer, 0, m_end, false);
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        // Validate arguments
        if (buffer == null)
        {
            throw new ArgumentNullException("buffer");
        }
        if (offset < 0)
        {
            throw new ArgumentOutOfRangeException("offset");
        }
        if (offset + count > buffer.Length)
        {
            throw new ArgumentOutOfRangeException("count");
        }

        int totalRead = 0;

        // Read from buffer
        if (m_start < m_end)
        {
            int toRead = Math.Min(m_end - m_start, count);
            Array.Copy(m_buffer, m_start, buffer, offset, toRead);
            m_start += toRead;
            offset += toRead;
            count -= toRead;
            totalRead += toRead;
        }

        // Read from stream
        if (count > 0)
        {
            totalRead += m_stream.Read(buffer, offset, count);
        }

        // Return total bytes read
        return totalRead;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }

    public override int ReadByte()
    {
        if (m_start < m_end)
        {
            return m_buffer[m_start++];
        }
        else
        {
            return m_stream.ReadByte();
        }
    }

    public override void Flush()
    {
        m_stream.Flush();
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            m_stream.Dispose();
        }
        base.Dispose(disposing);
    }
}

Отказ от ответственности: вышеприведенный PeekStream взят из работающей программы, но не всесторонне протестирован, поэтому может содержать ошибки. Это работает для меня, но вы можете раскрыть некоторые случаи, когда это не удается.

...