Как остановить выполнение, когда рабочая очередь пуста и все потоки перестали работать? - PullRequest
0 голосов
/ 24 мая 2019

При написании алгоритма поиска в пространстве состояний у меня есть рабочая очередь с элементами узла.У меня есть несколько потоков с доступом к этой очереди, которые извлекают элемент, выполняют некоторые преобразования и проверки для него и могут добавить больше узлов, которые будут посещены в очередь.

Я хочу, чтобы программа остановилась всякий раз, когда очередьпусто, и все потоки перестали работать (так как они могли бы добавить больше элементов, в этом случае нам понадобились бы другие потоки, чтобы помочь обработать эти новые узлы).

Как мне выполнить эту проверку?В настоящее время я думал о том, чтобы сохранить AtomicBitSet, отследить, какие потоки работают, а какие нет, и остановить выполнение, когда битовый набор пуст.Я бы установил и сбросил со следующим, в run методе моих обработчиков

while (!bitset.isAllUnset()) {
    Node node = queue.poll();
    if (node == null) {
        bitset.unset(THREAD_INDEX);
    } else {
        bitset.set(THREAD_INDEX);

        // HANDLE THE NODE
    }
}

Есть ли какой-нибудь рекомендуемый метод для этого?

Ответы [ 3 ]

0 голосов
/ 24 мая 2019

Используйте ExecutorService, для которого вы отправляете Runnable s, которые читают очередь и которые останавливаются, когда очередь пуста.

Затем вызовите метод awaitTermination() службы исполнителя, который будет блокировать до тех пор, пока все потоки не будут завершены.

Или используйте CompletableFuture:

CompleteableFuture.allOf(
    CompleteableFuture.runAsync(
        () -> while(!queue.isEmpty()) handle(queue.poll())
    ));
0 голосов
/ 24 мая 2019

Я думаю, что это на самом деле довольно сложно.Я не знаю, как написать правильную версию, используя подход, основанный на множестве.Например, следующий подход неверен:

public class ThreadsStopWorkingWrong {
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
ConcurrentHashMap activeThreads = new ConcurrentHashMap();
volatile int prozessedCount = 0;
volatile boolean stop = false;
@Interleave(group = ThreadsStopWorkingWrong.class, threadCount = 1)
public void readFromQueue() {
    int prozessAdditionalElements = 1;
    while (!stop) {
        Object element = queue.poll();

        if (element != null) {
            activeThreads.put(Thread.currentThread(), "");
            if (prozessAdditionalElements > 0) {
                prozessAdditionalElements--;
                queue.offer("2");
            }
            prozessedCount++;
        } else {
            activeThreads.remove(Thread.currentThread());
        }
    }
}
@Interleave(group = ThreadsStopWorkingWrong.class, threadCount = 1)
public void waitTillProzessed() throws InterruptedException {
    while (!queue.isEmpty() && !activeThreads.isEmpty()) {
        Thread.sleep(1);
    }
    assertEquals(2, prozessedCount);
}
@Test
public void test() throws InterruptedException {
    queue.offer("1");
    Thread worker = new Thread(() -> readFromQueue());
    worker.start();
    waitTillProzessed();
    worker.join();

}
}

Проблема в том, что при опросе сообщения из очереди вы еще не добавили поток в активированный набор, так что! Queue.isEmpty () &&! activeThreads.isEmpty () становится истинным.Что работает, так это использование счетчика сообщений, как в следующем примере:

public class ThreadsStopWorkingCorrect {
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
AtomicLong messageCount = new AtomicLong();
volatile int prozessedCount = 0;
volatile boolean stop = false;
@Interleave(group = ThreadsStopWorkingCorrect.class, threadCount = 1)
public void readFromQueue() {
    int prozessAdditionalElements = 1;
    while (!stop) {
        Object element = queue.poll();
        if (element != null) {
            if (prozessAdditionalElements > 0) {
                prozessAdditionalElements--;
                queue.offer("2");
                messageCount.incrementAndGet();
            }
            prozessedCount++;
            messageCount.decrementAndGet();
        }
    }
}
@Interleave(group = ThreadsStopWorkingCorrect.class, threadCount = 1)
public void waitTillProzessed() throws InterruptedException {
    while (messageCount.get() > 0) {
        Thread.sleep(1);
    }
    assertEquals(2, prozessedCount);
}
@Test
public void test() throws InterruptedException {
    queue.offer("1");
    messageCount.incrementAndGet();
    Thread worker = new Thread(() -> readFromQueue());
    worker.start();
    waitTillProzessed();
    worker.join();

}
}

Я протестировал оба примера с vmlens , инструментом, который я написал для тестирования многопоточного программного обеспечения.Поэтому чередование аннотаций.

В версии на основе набора некоторые чередования потоков приводят к prozessedCount == 0.В счетной версии prozessedCount всегда равен 2.

0 голосов
/ 24 мая 2019

То, что вы могли бы сделать, это следующий подход:

  1. Создайте ThreadPool и поместите начальную задачу в свою очередь.
  2. Сохраните один поток (ваш основной поток) в качестве мониторав ThreadPool.
  3. Задача этого потока - запускать новые потоки, пока очередь не пуста, у пула потоков еще осталось место и дать им задание.
  4. потокзапущенный выполняет свою работу и записывает результаты обратно в очередь.
  5. После этого он возвращается в пул, и вам нужно будет разбудить ваш монитор.
  6. Ваш главный поток будет затемпопытайтесь запустить новый поток, если пул потоков не достиг предела и очередь задач не пуста.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...