Доступ к самому молодому элементу ConcurrentQueue при наличии нескольких потоков, работающих в очереди - PullRequest
2 голосов
/ 15 мая 2011

У нас есть ConcurrentQueue, который используется для обмена данными между 3 потоками.Поток A непрерывно заполняет очередь данными.Поток B предназначен для записи этих данных в файл.Предполагается, что поток C извлекает самую младшую запись в очереди (или как можно ближе к самой молодой), выполняет с ней некоторые операции и отображает результаты на экране.

Поток B, чтобы кластеризовать запись файлаОперации во времени, делает что-то вроде этого:

if (cq.Count > 100)
{
    while (cq.Count > 1)
    {
        qElement = PopFromCq(cq); // PopFromCq uses cq.TryDequeue()
        bw.Write(qElement.data); // bw is a binary writer
    }
}
else
{
    System.Threading.Thread.Sleep(10);
}

, то есть он ожидает, как минимум 100 элементов в очереди, а затем записывает их на диск.Он всегда поддерживает хотя бы один элемент в очереди, и причина в том, что мы хотим, чтобы поток C всегда имел доступ хотя бы к одному элементу.

Цикл в потоке C выглядит следующим образом:

while (threadsRunning) 
{
    System.Threading.Thread.Sleep(500); // Update twice per second
    ProcessDataAndUpdateScreen(cq.ElementAt(cq.Count - 1)); // our terrible attempt at looking at the latest (or close to latest) entry in the queue
}

В этом цикле мы иногда получаем исключение из-за гонки между потоком, записывающим данные на диск, и вызовом cq.ElementAt (cq.Count-1).Я считаю, что происходит следующее:

  1. cq.Count рассчитывается, скажем, 90.
  2. К тому времени поток B уже запустил свой цикл, и он удаляет данныеиз очереди для записи на диск
  3. К тому времени, когда вызывается cq.ElementAt (), поток B потребляет такое количество элементов, что (cq.Count - 1) больше не указывает на действительную запись вочередь.

Есть идеи, как можно получить доступ к самой молодой записи в очереди при наличии нескольких потоков, работающих в очереди?

С уважением,

1 Ответ

2 голосов
/ 15 мая 2011

Необходимо ли, чтобы связь AB и связь AC проходили через очередь?Что, если у вас есть поток A, записывающий каждую запись в очередь (для чтения и записи B), а также сохраняющий запись, которую он просто помещает в очередь в энергозависимом свойстве где-нибудь.Каждый раз, когда С хочет получить самый младший элемент, он может просто читать непосредственно из этого свойства.

РЕДАКТИРОВАТЬ: вместо того, чтобы просто полагаться на изменяемое свойство, вы должны использовать Interlocked.CompareExchange<T>(T, T) для установки и чтения «самого молодого»запись "собственность.

...