рабочие очереди производителей / потребителей - PullRequest
10 голосов
/ 10 февраля 2010

Я борюсь с лучшим способом реализации моего конвейера обработки.

Мои продюсеры передают работу в BlockingQueue. Что касается потребителя, я опрашиваю очередь, упаковываю то, что получаю, в задачу Runnable и отправляю ее в ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

Это не идеально; Конечно, у ExecutorService есть своя собственная очередь, поэтому на самом деле происходит то, что я всегда полностью истощаю свою рабочую очередь и заполняю очередь задач, которая медленно очищается по мере выполнения задач.

Я понимаю, что могу ставить задачи в очередь на стороне продюсера, но на самом деле я бы предпочел этого не делать - мне нравится, что моя очередь на работу / изоляция являются глупыми строками; на самом деле это не какое-то дело продюсера. Принуждение производителя поставить в очередь Runnable или Callable нарушает абстракцию, ИМХО.

Но я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки. Я хочу иметь возможность блокировать производителей, если потребители не поспевают за ними.

Я бы хотел использовать Executors, но я чувствую, что борюсь с их дизайном. Могу ли я частично выпить Kool-ade или я должен проглотить его? Я ошибаюсь, сопротивляясь задачам в очереди? (Я подозреваю, что мог бы настроить ThreadPoolExecutor для использования очереди из 1 задачи и переопределить его метод execute для блокировки, а не для отклонения при заполнении очереди, но это выглядит брутто.)

Предложения

Ответы [ 3 ]

18 голосов
/ 10 февраля 2010

Я хочу, чтобы общая рабочая очередь представлять текущую обработку состояние.

Попробуйте использовать общую BlockingQueue и иметь пул рабочих потоков, отбирающих рабочие элементы из очереди.

Я хочу иметь возможность заблокировать производители, если потребители не не отставать.

Обе ArrayBlockingQueue и LinkedBlockingQueue поддерживают ограниченные очереди, так что они будут блокироваться на пути при заполнении. Использование блокирующих методов put () гарантирует блокировку производителей, если очередь заполнена.

Вот грубое начало. Вы можете настроить количество рабочих и размер очереди:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
1 голос
/ 25 октября 2010

"найти доступный существующий рабочий поток, если он существует, создать его, если необходимо, убить их, если они простаивают."

Управление всеми этими рабочими состояниями так же ненужно, как и опасно. Я хотел бы создать один поток мониторинга, который постоянно работает в фоновом режиме, единственная задача которого - заполнить очередь и порождать потребителей ... почему бы не сделать рабочие потоки демонов , чтобы они умирали, как только завершали работу? Если вы присоедините их все к одной группе потоков, вы можете динамически изменить размер пула ... например:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**
0 голосов
/ 10 февраля 2010

Ваш потребитель может выполнить Runnable::run напрямую, вместо того, чтобы начинать новый поток. Объедините это с очередью блокировки с максимальным размером, и я думаю, что вы получите то, что вы хотите. Ваш потребитель становится работником, который выполняет задачи в режиме реального времени на основе рабочих элементов в очереди. Они будут снимать с производства только те вещи, которые обрабатываются, и ваш производитель, когда ваши потребители перестают потреблять.

...