Лучшая буферная архитектура для обработки массива входящего массива байтов - PullRequest
0 голосов
/ 03 марта 2012

Я ищу совет, как лучше всего спроектировать структуру буфера, которая может обрабатывать огромное количество входящих данных, которые обрабатываются с меньшей скоростью, чем входящие данные.

Я запрограммировал настраиваемое двоичное устройство чтения, которое можетпотоковая передача до 12 миллионов байтовых массивов в секунду в одном потоке и поиск обработки потока байтового массива в отдельной структуре на одной и той же машине и в другом потоке.Проблема в том, что структура потребления не может соответствовать количеству входящих данных производителя, и поэтому я считаю, что мне нужен какой-то буфер для правильной обработки.Меня больше всего интересуют советы относительно общей архитектуры, а не примеры кода.Я нацеливаюсь на .Net 4.0.Вот больше информации о моих текущих настройках и требованиях.

Producer: работает в выделенном потоке и считывает байтовые массивы из файлов на физическом носителе (SSD, OCZ Vertex 3 Max IOPS).Приблизительная пропускная способность составляет 12 миллионов байтовых массивов в секунду.Каждый массив имеет размер всего 16 байт.Полностью реализовано

Потребитель. Предполагается, что он работает в отдельном потоке, отличном от производителя. Создает байтовые массивы, но должен обрабатывать несколько примитивных типов данных перед обработкой данных, поэтому скорость обработки значительно ниже скорости публикации производителя.Структура потребителя полностью реализована.

Между: стремление установить буферизованную структуру, которая предоставляет производителю возможность публикации, а потребитель - ну, в общем, потребляет.Не реализовано.

Я был бы рад, если бы некоторые из вас могли прокомментировать из вашего собственного опыта или опыта, что лучше всего рассмотреть, чтобы справиться с такой структурой.Должен ли буфер реализовывать алгоритм регулирования, который запрашивает новые данные только у производителя, когда буфер / очередь наполовину пуст или около того?Как обрабатывается блокировка и блокировка?Извините, у меня очень ограниченный опыт в этом пространстве, и я до сих пор справлялся с ним посредством реализации шины обмена сообщениями, но любая технология шины обмена сообщениями, на которую я смотрел, определенно не может справиться с требуемой пропускной способностью.Любые комментарии очень приветствуются !!!

Редактировать: Забыл упомянуть, данные используются только одним потребителем.Также имеет значение порядок, в котором публикуются массивы;заказ должен быть сохранен таким образом, чтобы потребитель потреблял в том же порядке.

Ответы [ 4 ]

1 голос
/ 03 марта 2012

16 байт (назовите это 16B) слишком мало для эффективной связи между потоками. Постановка в очередь таких небольших буферов приведет к увеличению затрат ЦП на связь между потоками, чем на фактическую полезную обработку данных.

Итак, разбейте их на куски.

Объявите некоторый буферный класс (скажем, C16B), который содержит хороший, большой массив этих 16B - по крайней мере, 4 КБ, и значение 'count' int, чтобы показать, сколько загружено (последний буфер, загруженный из файл, вероятно, не будет полным). Это поможет, если вы поместите пустой байтовый массив размером с строку кэша прямо перед этим массивом 16B - это поможет избежать ложного разделения. Возможно, вы можете поместить код, который обрабатывает 16B, как метод, Process16B, sya и, возможно, код, который загружает массив тоже - принимает дескриптор файла в качестве параметра. Этот класс теперь может быть эффективно загружен в очередь в другие потоки.

Вам нужен класс очереди производитель-потребитель - в C # он уже есть в классах BlockingCollection.

Вам нужно управление потоком в этом приложении. Я бы сделал это, создав пул C16B - создайте очередь блокировки и создайте / добавьте большую кучу C16B в цикле. 1024 - хорошее круглое число. Теперь у вас есть «очередь пула», которая обеспечивает управление потоком, избавляет от необходимости использовать new () любых C16B, и вам не нужно, чтобы они постоянно собирались мусором.

Если у вас есть это, все остальное легко. В потоке загрузчика постоянно исключайте C16B из очереди пула, загружайте их данными из файлов и добавляйте () их в потоки обработки в очереди блокировки '16Bprocess'. В потоках обработки возьмите () из очереди 16Bprocess и обработайте каждый экземпляр C16B, вызвав его метод Process16B. После обработки 16B добавьте () C16B обратно в очередь пула для повторного использования.

Переработка C16B через очередь пула обеспечивает сквозное управление потоком. Если производитель является самой быстрой ссылкой, пул в конечном итоге будет пустым, и производитель будет блокировать его до тех пор, пока потребитель не вернет немного C16B.

Если обработка занимает так много времени, вы всегда можете добавить другой поток обработки, если у вас есть запасные ядра. Проблема с такими схемами в том, что данные будут обрабатываться не по порядку. Это может иметь или не иметь значение. Если это произойдет, поток данных может потребоваться «уладить» позже, например. используя порядковые номера и список буферов.

Я бы посоветовал сбросить счетчик очереди пула (и, возможно, счетчик очереди 16B) в компонент состояния или командную строку с таймером. Это обеспечивает полезный снимок того, где находятся экземпляры C16B, и вы можете увидеть узкие места и любые утечки C16B без сторонних инструментов (те, которые замедляют работу всего приложения до обхода и выдают ложные отчеты об утечках при завершении работы).

1 голос
/ 03 марта 2012

Вы можете использовать BlockingCollection , чтобы запретить производителю добавлять элементы в коллекцию, если потребитель не потребляет достаточно элементов.

Существуют и другие классы одновременной коллекции, такие какну например ConcurrentQueue

0 голосов
/ 03 марта 2012

Зачем вообще беспокоиться о буфере? Используйте файлы на диске в качестве буфера. Когда потребитель начинает обрабатывать байтовый массив, пусть читатель прочитает следующий и все.

РЕДАКТИРОВАТЬ: После запроса на разъединение потребителя и производителя.

У вас может быть координатор, который сообщает производителю о создании X-байтовых массивов и предоставляет X-байтовые массивы потребителю. Три части могут действовать так:

Координатор приказывает производителю создавать массивы байтов X. Производитель производит X байтовых массивов

А теперь сделайте это в цикле: Координатор сообщает потребителю X-байтовые массивы Координатор говорит продюсеру создать массивы байтов X Потребитель сообщает координатору, что он потребляет Цикл, пока больше нет байтовых массивов

Производитель и координатор могут работать в одном потоке. Потребитель должен иметь свою собственную нить.

У вас почти не будет блокировок (я думаю, что вы можете сделать это вообще без блокировок, просто с помощью одной ручки ожидания, которую потребитель использует, чтобы уведомить координатора, что это сделано), и ваш координатор очень прост.

REEDIT: еще один действительно отделенный вариант

Используйте ZeroMQ для обработки сообщений. Производитель читает байтовые массивы и отправляет каждый массив в сокет ZeroMQ. Потребитель читает массивы из сокета ZeroMQ.

ZeroMQ очень эффективен и быстр, и обрабатывает все технические аспекты (синхронизация потоков, буферизация и т. Д.) Внутри. При использовании на одном компьютере вы также не потеряете данные (что может произойти при использовании UDP на двух разных компьютерах).

0 голосов
/ 03 марта 2012

IMO какой-либо очереди блокировки может решить вашу проблему.По сути поток источника будет блокироваться, если в очереди больше нет места.Посмотрите на это Создание очереди блокировки в .NET?

...