Как два потока могут получить доступ к общему массиву буферов с минимальной блокировкой? (C #) - PullRequest
3 голосов
/ 25 марта 2010

Я работаю над приложением для обработки изображений, где у меня есть два потока поверх основного потока:

1 - CameraThread , который захватывает изображения с веб-камеры и записывает их в буфер

2 - ImageProcessingThread , который берет последнее изображение из этого буфера для фильтрации.

Причина, по которой это многопоточность, заключается в том, что скорость критична, и мне нужно иметь CameraThread, чтобы продолжать захватывать изображения и делать последний снимок готовым для захвата ImageProcessingThread, пока он еще обрабатывает предыдущее изображение.

Моя проблема заключается в поиске быстрого и поточно-ориентированного способа доступа к этому общему буферу, и я решил, что в идеале это должен быть тройной буфер (image [3]), так что если ImageProcessingThread работает медленно, то CameraThread можно продолжать писать на двух других изображениях и наоборот.

Какой механизм блокировки наиболее подходит для обеспечения поточной безопасности?

Я посмотрел на оператор блокировки, но кажется, что он будет блокировать поток, ожидающий завершения другого, и это будет против точки тройной буферизации.

Заранее спасибо за любую идею или совет.

J.

Ответы [ 7 ]

8 голосов
/ 25 марта 2010

Это может быть пример из учебника Модель производитель-потребитель .

Если вы собираетесь работать в .NET 4, вы можете использовать IProducerConsumerCollection<T> и соответствующие конкретные классы для обеспечения вашей функциональности.

Если нет, прочитайте эту статью для получения дополнительной информации о шаблоне и этот вопрос для получения руководства по написанию собственной поточно-ориентированной реализации блокировки First-In Структура First-Out.

3 голосов
/ 25 марта 2010

Лично я думаю, что вы могли бы рассмотреть другой подход для этого, вместо того, чтобы записывать в централизованный «буфер», к которому вам нужно управлять доступом, вы могли бы переключиться на подход, который использует события. Как только поток камеры «получил» изображение, он может вызвать событие, которое передало данные изображения в процесс, который фактически обрабатывает обработку изображения.

Альтернативой может быть использование очереди, которая является структурой данных FIFO (первым пришел первым - первым), теперь она не является поточно-ориентированной для доступа, поэтому вам придется заблокировать ее, но время блокировки будет очень минимальный, чтобы поставить элемент в очередь. Существуют также другие классы очереди, которые можно использовать в многопоточных системах.

Используя ваш подход, вам придется столкнуться с рядом проблем. Блокировка при доступе к массиву, ограничения в отношении того, что происходит после исчерпания доступных слотов массива, блокировки и т. Д.

2 голосов
/ 25 марта 2010

Учитывая количество прецессии, необходимой для изображения, я не думаю, что простая схема блокировки будет вашим узким местом. Измерьте, прежде чем начать тратить время на не ту проблему.
Будьте очень осторожны с решениями без блокировки, они всегда сложнее, чем выглядят.

И вам нужна очередь, а не массив.
Если вы можете использовать dotNET4, я бы использовал ConcurrentQuue.

1 голос
/ 25 марта 2010

Вам нужно будет запустить некоторые показатели производительности, но взгляните на lock free queues.

См. этот вопрос и связанные с ним ответы , например.

Однако в вашем конкретном приложении ваш процессор действительно интересуется только самым последним изображением. По сути, это означает, что вы действительно хотите поддерживать очередь из двух элементов (новый элемент и предыдущий элемент), чтобы не было споров между чтением и записью. Например, вы можете попросить производителя удалить старые записи из очереди после записи новой.

Редактировать: сказав все это, я думаю, что в ответе Митчела Селлерса есть много достоинств.

0 голосов
/ 26 марта 2010

Просто идея.

Поскольку речь идет только о двух потоках, мы можем сделать некоторые предположения.

Позволяет использовать вашу идею тройного буфера. Предполагая, что есть только 1 поток записи и 1 поток чтения, мы можем бросить «флаг» туда и обратно в виде целого числа. Оба потока будут непрерывно вращаться, но будут обновлять свои буферы. ВНИМАНИЕ: Это будет работать только для 1 нитей чтения

Псевдокод

Общие переменные:

int Status = 0; // 0 = готов к записи; 1 = готов к чтению

Buffer1 = Новые байты []

Buffer2 = Новые байты []

Buffer3 = Новые байты []

BufferTmp = null

thread1 {

while(true)
{
    WriteData(Buffer1);
    if (Status == 0)
    {
        BufferTmp = Buffer1;
        Buffer1 = Buffer2;
        Buffer2 = BufferTmp;
        Status = 1;
    }
}

}

thread2 {

while(true)
{
    ReadData(Buffer3);
    if (Status == 1)
    {
        BufferTmp = Buffer1;
        Buffer2 = Buffer3;
        Buffer3 = BufferTmp;
        Status = 0;
    }
}

} 1031 * *

просто помните, что вы writedata метод не будет создавать новые объекты байтов, но обновлять текущий. Создание новых объектов стоит дорого.

Кроме того, вы можете захотеть, чтобы thread.sleep (1) в инструкции ELSE сопровождал операторы IF, в противном случае один одноядерный ЦП вращающийся поток увеличит задержку до того, как другой поток будет запланирован. например. Поток записи может запустить вращение 2-3 раза, прежде чем поток чтения будет запланирован, потому что планировщики видят, что поток записи выполняет "работу"

0 голосов
/ 25 марта 2010

Это не прямой ответ на ваш вопрос, но может быть лучше переосмыслить вашу модель параллелизма. Блокировки - ужасный способ синхронизировать что угодно - слишком низкий уровень, подверженность ошибкам и т. Д. Попробуйте переосмыслить свою проблему с точки зрения параллелизма передачи сообщений :

Идея заключается в том, что каждый поток представляет собой свой собственный плотно замкнутый цикл сообщений, и каждый поток имеет «почтовый ящик» для отправки и получения сообщений - мы будем использовать термин MailboxThread, чтобы отличать эти типы объектов от простых Джейн темы.

Таким образом, вместо двух потоков, обращающихся к одному и тому же буферу, вместо этого у вас есть два MailboxThreads, отправляющие и получающие сообщения между собой (псевдокод):

let filter =
    while true
        let image = getNextMsg() // blocks until the next message is recieved
        process image

let camera(filterMailbox) =
    while true
        let image = takePicture()
        filterMailbox.SendMsg(image) // sends a message asyncronous

let filterMailbox = Mailbox.Start(filter)
let cameraMailbox = Mailbox.Start(camera(filterMailbox))

Теперь вы обрабатываете потоки, которые вообще не знают или не заботятся о каких-либо буферах. Они просто ждут сообщений и обрабатывают их, когда они доступны. Если вы отправляете много сообщений для обработки filterMailbox, эти сообщения ставятся в очередь для последующей обработки.

Сложная часть - это реализация вашего объекта MailboxThread. Несмотря на то, что для правильной реализации требуются некоторые творческие способности, вполне возможно реализовать эти типы объектов таким образом, чтобы они удерживали поток открытым при обработке сообщения и высвобождали исполняющий поток обратно в пул потоков, когда не осталось сообщений для обработки (эта реализация позволяет вам завершать работу приложения без висячих потоков).

Преимущество здесь в том, что потоки отправляют и получают сообщения, не беспокоясь о блокировке или синхронизации. За кулисами вам нужно заблокировать свою очередь сообщений между постановкой или снятием очереди сообщения, но эти детали реализации полностью прозрачны для вашего клиентского кода.

0 голосов
/ 25 марта 2010

Я бы посмотрел на использование ReaderWriterLockSlim , которое позволяет быстро читать и обновлять блокировки для записи.

...