Как использовать текущий поток для обработки рабочей очереди? - PullRequest
3 голосов
/ 11 июля 2011

Какие структуры данных я использую для реализации следующей логики?

  1. read () - это асинхронный метод, который ставит в очередь некоторую рабочую нагрузку
  2. Одновременно может выполняться только одна рабочая нагрузка.
  3. Первый поток, ставящий очередь рабочей нагрузки, становится рабочим потоком.Он обрабатывает всю работу в очереди перед возвратом.Следующий поток, который вызывает read (), становится новым рабочим потоком и т. Д. ...
  4. Если другие потоки вызывают read (), когда рабочий поток обрабатывает очередь, они просто добавляют в конецОчередь и немедленно возвращаюсь.

Я знаю, как реализовать это, используя ConcurrentLinkedQueue и AtomicBoolean, но я чувствую, что есть лучший способ.

УТОЧНЕНИЕ :Рабочая нагрузка состоит из вызова другого асинхронного метода с именем read2 ().read2 () асинхронный, но не потокобезопасный.Когда я говорю, что рабочий поток «обрабатывает рабочую нагрузку», он просто запускает первую операцию чтения и сразу же возвращается.Когда read2 () завершается, он вызывает следующую операцию в очереди и так далее.Весь API является асинхронным.Поэтому я хотел бы избежать выделенного потока пользователя (в этом нет реальной необходимости, и это плохо для масштабируемости).

Ответы [ 3 ]

1 голос
/ 12 июля 2011

Из-за неопределенности определения точного состояния элементов очереди обработки потока или нет, я предлагаю, чтобы каждый поток, независимо от того, является ли пользовательский поток или какой-либо поток ядра, запускающий асинхронный обратный вызов, поступающий в эту систему, ставит свою рабочую нагрузку в очередь.и затем пытается обработать все элементы в очереди.Если некоторые рабочие нагрузки запрашивают проблему вызова без использования потока, защитите этот вызов только с помощью CS / счетчика - вы говорите, что эти вызовы без поддержки потока в любом случае являются короткими, поэтому блокировка CS / прокрутки обойдется вам очень дешево.

Дамп AtomicBoolean.Несмотря на то, что определение ясно, это означает, что ни один поток не обрабатывает рабочие нагрузки в очереди, обнаружение того, что он установлен, не означает, что поток обрабатывает элементы в очереди: существует третье состояние - завершено с элементами в очереди, но не совсем обошлосьочистить логическое значение еще. '

Rgds, Мартин

0 голосов
/ 12 июля 2011

Как насчет простого создания метода read synchronized?

Вы просто вызываете read напрямую, вместо того, чтобы помещать рабочую нагрузку ("call read") в синхронизированную очередь.Возможно, вашим потокам придется немного подождать, чтобы получить блокировку синхронизации, но, как вы сказали, read выполняется очень быстро, поэтому это не должно быть проблемой.

(я не уверен на 100%, что понимаю вашепроблема еще, но вот что, я думаю, я понимаю: все, что read делает, это запускает метод read2 в новом потоке. Однако кажется, что есть некоторая проблема безопасности потока с read, так как вы не хотите (длянеобъяснимые причины) много read для параллельного выполнения.)

EDIT : я понял, что не знаю, является ли read методом одного общего объекта, статическим методомили если он присутствует во многих объектах.Если это последнее, то просто сделать метод synchronized не очень полезным;вам нужно получить некоторую глобальную блокировку (возможно, блокировку класса , содержащую read) либо до вызова read, либо в пределах read.

0 голосов
/ 12 июля 2011

Я думаю, что ваш дизайн может нарушать какое-то важное правило в разработке программного обеспечения, имя которого ускользает от меня в данный момент.В идеале у вас должен быть один класс, ответственный за одну вещь, поэтому просьба к классам изменить то, за что они отвечают в зависимости от внешнего состояния, вызовет у вас некоторые проблемы.

Я бы порекомендовал вам взять Производитель / Потребительская Скороговорка во внимание.Ваш поток (ы) продюсера добавит работу в очередь, а поток потребителя обработает ее.Это гарантирует, что вашим потокам-производителям не нужно заботиться о том, когда они отвечают за получение работы из очереди, а когда они отвечают за размещение работы в очереди, все, о чем они заботятся, - это помещает работу в очередь.Ваш потребительский поток будет посвящен извлечению работы из очереди и ее выполнению, и это все, что он сделает!

Обновите базу на основе вашего комментария: производитель не должен быть выделенным,все, что вам нужно сделать - это сделать так, чтобы ваш обратный вызов read поставил в очередь рабочий элемент (я полагаю, это что-то вроде обратного вызова, но поправьте меня, если я ошибаюсь).Вот и снова:

private static BlockingQueue q;
// Asynchronous callback
public void read()
{
    q.enqueue(new WorkItem());
}

Возможно, потребителю нужно быть преданным, но концептуально существует очень мало различий между выделением потребителя и тем, чтобы несколько производителей играли исключительную роль в качестве потребителя:

public class Consumer imlpements Runnable {
    private BlockingQueue _queue;
    public Consumer(BlockingQueue q){
        _queue = q;
    }

    public void run(){
        while(true){
            // block on dequeue until there is work in the queue (i.e. never exits the loop)
            WorkItem w = _queue.dequeue();
            w.DoWork();
        }
    }
}

Теперь вы можете запустить и Потребителя и Производителя (ей) в пуле потоков (ExecutorService), и он будет выполнять именно то, что вы хотите.Обратите внимание, что я не включил все навороты безопасности потоков, такие как перехват исключений прерываний и корректное завершение, но это то, что вы можете (и должны) добавить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...