обработка потока в реальном времени .net - необходим большой и быстрый буфер оперативной памяти - PullRequest
0 голосов
/ 30 марта 2010

Разрабатываемое мной приложение связывается с цифровым аудиоустройством, которое способно отправлять 24 различных голосовых потока одновременно. Устройство подключено через USB, используя устройство FTDI (эмулятор последовательного порта) и драйверы D2XX (основной драйвер COM должен замедлять обработку 4,5 Мбит).

В основном приложение состоит из 3 потоков:

Основной поток - GUI, контроль, т. Д. Bus reader - в этом потоке данные постоянно считываются с устройства и сохраняются в файловом буфере (в этом потоке нет логики) Интерпретатор данных - этот поток считывает данные из файлового буфера, конвертирует их в сэмплы, выполняет простую обработку сэмплов и сохраняет сэмплы в отдельные файлы WAV.

Причина, по которой я использовал файловый буфер, заключается в том, что я хотел быть уверен, что не потеряю ни одного семпла. Приложение не использует запись все время, поэтому я выбрал это решение, потому что оно было безопасным.

Приложение работает нормально, за исключением того, что генератор буферных волновых файлов работает довольно медленно. Для 24 параллельных записей продолжительностью 1 минута требуется около 4 минут для завершения записи. Я уверен, что исключение использования жесткого диска в этом процессе значительно увеличит скорость.

Вторая проблема заключается в том, что файловый буфер действительно тяжел для длинных записей, и я не могу это очистить до конца обработки данных (это еще больше замедлит процесс).

Для буфера ОЗУ мне нужно по крайней мере 1 ГБ, чтобы он работал правильно
Каков наилучший способ выделить такой большой объем памяти в .NET? Я собираюсь использовать эту память в двух потоках, поэтому нужен быстрый механизм синхронизации. Я думаю о буфере цикла: один большой массив, Bus Reader сохраняет данные, Data Interpreter читает их. Что вы думаете об этом?

[править] Теперь для буферизации я использую классы BinaryReader и BinaryWriter на основе файла.

1 Ответ

2 голосов
/ 30 марта 2010

Вы должны быть в состоянии собрать класс-оболочку, которая управляет одним буфером [] и распределяет временные блокировки для диапазонов данных, используя потоки памяти фиксированного размера. По сути, вы определяете свой буфер один раз, и каждый раз, когда вы хотите с ним работать, вы запрашиваете объект блокировки из класса ConcurrentBuffer. Блокировка содержит MemoryStream, который гарантированно будет иметь эксклюзивный доступ к диапазонам индексов, которые вы указали, когда запрашивали блокировку, пока не снимите ее.

Я собрал простой пример, который должен дать вам хорошую отправную точку:

public class ConcurrentBuffer
{
    private readonly byte[] buffer;

    internal byte[] GetBuffer() { return buffer; }

    private List<BufferLock> locks = new List<BufferLock>();

    public ConcurrentBuffer(int bufferSize)
    {
        buffer = new byte[bufferSize];
    }

    public BufferLock AcquireLock(int startIndex, int endIndex)
    {
        if (startIndex < 0) throw new ArgumentOutOfRangeException("startIndex");
        if (startIndex > endIndex || endIndex >= buffer.Length) throw new ArgumentOutOfRangeException("endIndex");
        lock (buffer)
        {
            foreach (var l in locks)
            {
                if (!(endIndex < l.StartIndex || startIndex > l.EndIndex))
                {
                    return null;
                }
            }
            var bl = new BufferLock(startIndex, endIndex, this);
            locks.Add(bl);
            return bl;
        }
    }

    public void ReleaseLock(BufferLock lck)
    {
        lock (buffer)
        {
            locks.Remove(lck);
        }
    }

    public class BufferLock
    {
        public int StartIndex { get; private set; }
        public int EndIndex { get; private set; }
        public ConcurrentBuffer TargetBuffer { get; private set; }
        public MemoryStream Stream { get; private set; }

        internal BufferLock(int startIndex, int endIndex, ConcurrentBuffer buffer)
        {
            StartIndex = startIndex;
            EndIndex = endIndex;
            TargetBuffer = buffer;
            Stream = new MemoryStream(buffer.GetBuffer(), startIndex, endIndex - startIndex, true);
        }

        public void Release()
        {
            Stream.Dispose();
            TargetBuffer.ReleaseLock(this);
        }
    }
}

class Program
{
    static void Main(string[] args)
    {
        ConcurrentBuffer cb = new ConcurrentBuffer(32000);

        byte[] myData = { 32, 13, 53, 29, 50 };

        // l1 will acquire a lock
        var l1 = cb.AcquireLock(0, 50); 
        if (l1 != null) l1.Stream.Write(myData, 0, myData.Length);

        // l2 will fail because l1 has part of its range locked
        var l2 = cb.AcquireLock(30, 70);
        if (l2 != null) l2.Stream.Write(myData, 0, myData.Length);

        l1.Release();

        // l3 will succeed at locking because l1 has been released
        var l3 = cb.AcquireLock(40, 5000);
        if (l3 != null)
        {
            while (l3.Stream.Position + myData.Length <= l3.Stream.Length)
            {
                l3.Stream.Write(myData, 0, myData.Length);
            }
        }

        l3.Release();

        Console.ReadLine();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...