Буфер FIFO / Queue, специализирующийся на потоках байтов - PullRequest
15 голосов
/ 22 ноября 2011

Существует ли какая-либо структура данных .NET / комбинация классов, которая позволяет добавлять байтовые данные в конец буфера, но все операции просмотра и чтения выполняются с самого начала, сокращая буфер при чтении?

Класс MemoryStream, кажется, выполняет часть этого, но мне нужно поддерживать отдельные места для чтения и записи, и он не сбрасывает данные автоматически при запуске после их чтения.

Ответ был опубликован в ответ на этот вопрос , что я и пытаюсь сделать, но я бы предпочел, чтобы я мог выполнять асинхронный ввод-вывод в разных компонентах одного и того же процесса. как обычный канал или даже сетевой поток (мне нужно сначала отфильтровать / обработать данные).

Ответы [ 3 ]

10 голосов
/ 22 ноября 2011

Я выложу раздетую копию какой-то логики, которую я написал для проекта на работе однажды. Преимущество этой версии в том, что она работает со связанным списком буферизованных данных, и, следовательно, вам не нужно кэшировать огромные объемы памяти и / или копировать память при чтении. кроме того, его поток является безопасным и ведет себя как сетевой поток, то есть: при чтении, когда нет доступных данных: подождите, пока есть доступные данные или тайм-аут. Кроме того, при чтении x байтов и при наличии y байтов, возвращайте после прочтения всех байтов. Надеюсь, это поможет!

    public class SlidingStream : Stream
{
    #region Other stream member implementations

    ...

    #endregion Other stream member implementations

    public SlidingStream()
    {
        ReadTimeout = -1;
    }

    private readonly object _writeSyncRoot = new object();
    private readonly object _readSyncRoot = new object();
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();

    public int ReadTimeout { get; set; }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (_dataAvailableResetEvent.Wait(ReadTimeout))
            throw new TimeoutException("No data available");

        lock (_readSyncRoot)
        {
            int currentCount = 0;
            int currentOffset = 0;

            while (currentCount != count)
            {
                ArraySegment<byte> segment = _pendingSegments.First.Value;
                _pendingSegments.RemoveFirst();

                int index = segment.Offset;
                for (; index < segment.Count; index++)
                {
                    if (currentOffset < offset)
                    {
                        currentOffset++;
                    }
                    else
                    {
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }
                }

                if (currentCount == count)
                {
                    if (index < segment.Offset + segment.Count)
                    {
                        _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
                    }
                }

                if (_pendingSegments.Count == 0)
                {
                    _dataAvailableResetEvent.Reset();

                    return currentCount;
                }
            }

            return currentCount;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeSyncRoot)
        {
            byte[] copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.AddLast(new ArraySegment<byte>(copy));

            _dataAvailableResetEvent.Set();
        }   
    }
}
1 голос
/ 01 сентября 2017

Код может быть проще, чем в принятом ответе.Нет необходимости использовать петлю for.:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}
0 голосов
/ 20 марта 2019

Я пытался полировать код Polity. Это далеко от оптимизации, но, возможно, просто работает.

public class SlidingStream : Stream {
  public SlidingStream() {
    ReadTimeout = -1;
  }

  private readonly object ReadSync = new object();
  private readonly object WriteSync = new object();
  private readonly ConcurrentQueue<ArraySegment<byte>> PendingSegments
    = new ConcurrentQueue<ArraySegment<byte>>();
  private readonly ManualResetEventSlim DataAvailable = new ManualResetEventSlim(false);
  private ArraySegment<byte>? PartialSegment;

  public new int ReadTimeout;

  public override bool CanRead => true;

  public override bool CanSeek => false;

  public override bool CanWrite => true;

  public override long Length => throw new NotImplementedException();

  public override long Position {
    get => throw new NotImplementedException();
    set => throw new NotImplementedException();
  }

  private bool Closed;

  public override void Close() {
    Closed = true;
    DataAvailable.Set();
    base.Close();
  }

  public override int Read(byte[] buffer, int offset, int count) {
    int msStart = Environment.TickCount;

    lock (ReadSync) {
      int read = 0;

      while (read < count) {
        ArraySegment<byte>? seg = TryDequeue(msStart);
        if (seg == null) {
          return read;
        }

        ArraySegment<byte> segment = seg.GetValueOrDefault();
        int bite = Math.Min(count - read, segment.Count);
        if (bite < segment.Count) {
          PartialSegment = new ArraySegment<byte>(
            segment.Array,
            segment.Offset + bite,
            segment.Count - bite
          );
        }

        Array.Copy(segment.Array, segment.Offset, buffer, offset + read, bite);
        read += bite;
      }

      return read;
    }
  }

  private ArraySegment<byte>? TryDequeue(int msStart) {
    ArraySegment<byte>? ps = PartialSegment;
    if (ps.HasValue) {
      PartialSegment = null;
      return ps;
    }

    DataAvailable.Reset();

    ArraySegment<byte> segment;
    while (!PendingSegments.TryDequeue(out segment)) {
      if (Closed) {
        return null;
      }
      WaitDataOrTimeout(msStart);
    }

    return segment;
  }

  private void WaitDataOrTimeout(int msStart) {
    int timeout;
    if (ReadTimeout == -1) {
      timeout = -1;
    }
    else {
      timeout = msStart + ReadTimeout - Environment.TickCount;
    }

    if (!DataAvailable.Wait(timeout)) {
      throw new TimeoutException("No data available");
    }
  }

  public override void Write(byte[] buffer, int offset, int count) {
    lock (WriteSync) {
      byte[] copy = new byte[count];
      Array.Copy(buffer, offset, copy, 0, count);

      PendingSegments.Enqueue(new ArraySegment<byte>(copy));

      DataAvailable.Set();
    }
  }

  public override void Flush() {
    throw new NotImplementedException();
  }

  public override long Seek(long offset, SeekOrigin origin) {
    throw new NotImplementedException();
  }

  public override void SetLength(long value) {
    throw new NotImplementedException();
  }
}
...