Проблема потребителя / производителя: приостановить производство на медленном потреблении - PullRequest
0 голосов
/ 04 октября 2018

У меня есть продюсер , который читает блоки текста с диска.Несколько потребителей выполняют вычисления на этих блоках.

Я бы хотел, чтобы производитель приостановил чтение данных с диска, если в данный момент вычисляется более чем n блоков.

Поместите его в псевдокод, чтобы проиллюстрировать, чего я хотел бы достичь.

// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
  // (!) if activeCounter exceeds a THRESHOLD, then pause

  executorService.submit(() -> { 
     activeCounter.incrementAndGet();

     // do some work

     activeCounter.decrementAndGet();
  });
});

Ответы [ 2 ]

0 голосов
/ 05 октября 2018

«Я бы хотел, чтобы производитель приостановил чтение данных с диска, если в данный момент вычисляется более n блоков».Реальное описание задачи немного отличается: производитель, прежде чем читать данные с диска, должен получить разрешение на это .Если ваш производитель - нить, то естественным средством управления разрешениями будет Семафор .Изначально он содержит n разрешений.Производитель, чтобы прочитать блок, получает 1 разрешение с Semaphore::aquire.Когда блок обрабатывается потребителем, потребитель выпускает 1 разрешение с Semaphore::release.

. Другой подход состоит в объединении блоков и разрешений.Аналогично очереди вывода от производителя к потребителю, создайте очередь блокировки ввода для блоков.Изначально положил туда n блоков.Производитель, чтобы прочитать блок, сначала берет следующий блок из этой очереди.Потребитель после обработки блока возвращает его в очередь ввода.

0 голосов
/ 04 октября 2018

Я бы использовал очередь фиксированной длины для вашего пула потоков и реализовал бы RejectedExecuptionHandler для запуска в текущем потоке или для приостановки и повторных попыток.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RejectedExecutionHandler.html#rejectedExecution(java.lang.Runnable,%20java.util.concurrent.ThreadPoolExecutor)

например

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Последний вариант, который я эффективно использовал, не требует дополнительного кода после настройки ExecutorService.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...