Гибкий CountDownLatch? - PullRequest
       28

Гибкий CountDownLatch?

23 голосов
/ 28 октября 2009

Я дважды сталкивался с проблемой, когда поток-производитель создает N рабочих элементов, отправляет их в ExecutorService, а затем должен дождаться обработки всех N элементов.

Предостережения

  • N заранее неизвестно . Если бы это было так, я просто создал бы CountDownLatch, а затем имел бы поток производителя await(), пока вся работа не была бы завершена.
  • Использование CompletionService неуместно, потому что, хотя мой поток производителя должен блокировать (т. Е. Путем вызова take()), нет никакого способа сообщить, что вся работа завершена , чтобы заставить поток производителя остановиться ожидание.

Мое текущее предпочтительное решение - использовать счетчик целых чисел, и увеличивать это всякий раз, когда передается элемент работы, и уменьшать его при обработке рабочего элемента. После отправки всех N задач моему производственному потоку нужно будет ждать блокировки, проверяя, counter == 0 всякий раз, когда он получает уведомление. Поток (и) потребителя должен будет уведомить производителя, если он уменьшил счетчик и новое значение равно 0.

Есть ли лучший подход к этой проблеме или в java.util.concurrent есть подходящая конструкция, которую я должен использовать, а не "катиться по-своему"?

Заранее спасибо.

Ответы [ 6 ]

26 голосов
/ 28 октября 2009

java.util.concurrent.Phaser похоже, это будет хорошо для вас. Его планируется выпустить в Java 7, но наиболее стабильную версию можно найти на веб-сайте группы интересов jsr166 .

Фазер - это прославленный циклический барьер. Вы можете зарегистрировать N партий и, когда будете готовы, дождаться их продвижения на конкретном этапе.

Быстрый пример того, как это будет работать:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
2 голосов
/ 28 октября 2009

Конечно, вы можете использовать CountDownLatch, защищенный AtomicReference, чтобы ваши задачи были обернуты следующим образом:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Обратите внимание , что задачи выполняют свою работу, а затем вращайте , пока не будет установлен обратный отсчет (т. Е. Общее число задач известно). Как только обратный отсчет установлен, они начинают обратный отсчет и выходят. Они представлены следующим образом:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

После точки создания / представления вашей работы , когда вы знаете, сколько задач вы создали :

latch.set(new CountDownLatch(nTasks));
latch.get().await();
1 голос
/ 28 октября 2009

Я использовал ExecutorCompletionService для чего-то вроде этого:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

Это включает в себя сохранение очереди задач, которые были отправлены, поэтому, если ваш список произвольно длинный, ваш метод может быть лучше.

Но обязательно используйте AtomicInteger для координации, чтобы вы могли увеличивать его в одном потоке и уменьшать его в рабочих потоках.

0 голосов
/ 08 августа 2018

Автономный метод Java 8+, который делает именно это для Stream с использованием однопроходного Phaser. Iterator / Iterable варианты абсолютно одинаковы.

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

FYI. Это хорошо для особых событий, но, если это вызывается одновременно, решение, включающее ForkJoinPool, остановит блокировку родительского потока.

0 голосов
/ 18 мая 2017

То, что вы описали, очень похоже на использование стандартного семафора, но используется «назад».

  • Ваш семафор начинается с 0 разрешений
  • Каждое рабочее подразделение выдает одно разрешение по завершении
  • Вы блокируете, ожидая, чтобы получить N разрешений

Кроме того, у вас есть возможность приобретать только разрешения M

0 голосов
/ 28 октября 2009

Полагаю, вашему производителю не нужно знать, когда очередь пуста, но нужно знать, когда было выполнено последнее задание.

Я бы добавил waitforWorkDone(producer) метод к потребителю. Производитель может добавить свои N задач и вызвать метод wait. Метод wait блокирует входящий поток, если рабочая очередь не пуста и в данный момент не выполняется никаких задач.

Пользовательские потоки notifyAll() в ожидании блокировки, если его задача завершена, очередь пуста и никакая другая задача не выполняется.

...