Я борюсь с лучшим способом реализации моего конвейера обработки.
Мои продюсеры передают работу в BlockingQueue. Что касается потребителя, я опрашиваю очередь, упаковываю то, что получаю, в задачу Runnable и отправляю ее в ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
Это не идеально; Конечно, у ExecutorService есть своя собственная очередь, поэтому на самом деле происходит то, что я всегда полностью истощаю свою рабочую очередь и заполняю очередь задач, которая медленно очищается по мере выполнения задач.
Я понимаю, что могу ставить задачи в очередь на стороне продюсера, но на самом деле я бы предпочел этого не делать - мне нравится, что моя очередь на работу / изоляция являются глупыми строками; на самом деле это не какое-то дело продюсера. Принуждение производителя поставить в очередь Runnable или Callable нарушает абстракцию, ИМХО.
Но я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки. Я хочу иметь возможность блокировать производителей, если потребители не поспевают за ними.
Я бы хотел использовать Executors, но я чувствую, что борюсь с их дизайном. Могу ли я частично выпить Kool-ade или я должен проглотить его? Я ошибаюсь, сопротивляясь задачам в очереди? (Я подозреваю, что мог бы настроить ThreadPoolExecutor для использования очереди из 1 задачи и переопределить его метод execute для блокировки, а не для отклонения при заполнении очереди, но это выглядит брутто.)
Предложения