Распараллеливайте и отслеживайте задачи с помощью ExecutorService - PullRequest
0 голосов
/ 04 августа 2020

У меня есть функция / задача void task(), которую нужно вызывать около 4-6 миллионов раз. Я хочу распараллелить эту операцию над потоками в пуле потоков. Меня не волнует возвращаемое значение задач, поэтому я могу не возиться с Future<T>. Я хочу периодически опрашивать статус того, как идут потоки. Статус - это просто, сколько вызовов task() вернулось чисто и сколько сгенерировало исключение.

Вот что я придумал:

class Test {
    AtomicInteger success = new AtomicInteger(0);
    AtomicInteger failed = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(1_000_000);

    private void start() {
        ExecutorService executorService = Executors.newFixedThreadPool();
        for (int i = 0; i < 1_000_000; i++) {
          executorService.execute(this::task);
        }
        while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== Final tally =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
        executorService.shutdown();
    }

    private void task() {
        try {
           doSomeStuff();
           success.incrementAndGet()
        } catch(Exception e) {
           failed.incrementAndGet();
        }
        countDownLatch.countDown();
    }
}

Два AtomicInteger, которые используют потоки для записи успехов или неудач и CountDownLatch, который поток "монитора" использует для проверки прогресса.

Есть ли более идиоматический c способ сделать это? Что-то, что не связано с отправкой миллионов Runnable лямбд в ExecutorService, возможно?

Я мог бы поместить все это в

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)

, но я не буду умеет следить за прогрессом.

1 Ответ

1 голос
/ 04 августа 2020

Если вы хотите использовать поток, как вы предложили, вы можете переместить часть мониторинга в другой поток, например:

new Thread(()->{  
 while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== Final tally =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
}).start()

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
...