ExecutorService и AtomicInteger: RejectedExecutionException - PullRequest
0 голосов
/ 14 ноября 2018

Я хочу, чтобы atomicInteger имел значение 100, после чего программа завершается

 public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        do {
            executor.submit(() -> {
                System.out.println(atomicInteger.getAndAdd(10));
                if (atomicInteger.get() == 100) {
                    //executor.shutdownNown();
                }
            });
        } while (true);
    }

У меня ошибка

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1d8d10a rejected from java.util.concurrent.ThreadPoolExecutor@9e54c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 10]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1374)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)

Как мне это реализовать.

Ответы [ 2 ]

0 голосов
/ 15 ноября 2018

Нет необходимости использовать AtomicInteger здесь, так как ваши вызовы лямбда-функции Runnable гарантированно будут выполняться последовательно (новым SingleThreadExecutor).Кроме того, ваш лямбда-код Runnable должен был выполняться в любое время (например, 2 мс), ваш основной цикл поставит в очередь намного более 10 задач, необходимых для достижения вашего лимита.Это может произойти, если вы добавите режим ожидания 2 мс внутри своей лямбда-функции Runnable, а также добавите счетчик в цикл do / while и выведите значение счетчика в конце, чтобы увидеть, сколько экземпляров Runnables вы поставили в очередь.

Предполагая, что вы хотите протестировать этот код с параллельными потоками, вам необходимо заменить вызов newSingleThreadPool на newFixedThreadPool.Подход, используемый в вашем коде, проблематичен при использовании параллельных потоков.В следующем коде я переключился на newFixedThreadPool, добавил счетчик, чтобы мы могли видеть, сколько задач поставлено в очередь и добавлено в короткие паузы в вашей лямбда-функции Runnable, просто для представления небольшого объема работы.Когда я выполняю эту программу, atomicInteger становится больше 13000, и программа аварийно завершает работу с java.lang.OutOfMemoryError: превышен предел издержек GC Это связано с тем, что ваша выполняемая функция всегда добавляет 10 к atomicInteger независимо от его текущего значения.Кроме того, код ставит в очередь больше задач, чем нужно.Вот код с этими небольшими изменениями, которые иллюстрируют проблему.

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    int i=0;
    do {
        executor.submit(() -> {
            pause(2); // simulates some small amount of work.
            System.out.println("atomicInt="+atomicInteger.getAndAdd(10));
            pause(2); // simulates some small amount of work.
            if (atomicInteger.get() == 100) {
                System.out.println("executor.shutdownNow()");
                System.out.flush();
                executor.shutdownNow();
            }
        });
        if (atomicInteger.get() == 100) {
            break;
        }
    } while (true);
    System.out.println("final atomicInt="+atomicInteger.get());
    System.out.println("final tasks queued="+i);
}
public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {
    }
}

Вот версия, которая устраняет проблемы параллелизма и перемещает управление исполнителем из рабочих потоков, где оно на самом деле не принадлежит:

private static int LIMIT = 100;
private static int INCREMENT = 10;

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    for (int i=0; i < LIMIT/INCREMENT; i++) {
            executor.submit(() -> {
                pause(2);
                System.out.println("atomicInt=" + atomicInteger.getAndAdd(INCREMENT));
                System.out.flush();
                pause(2);
            });
    }
    executor.shutdown();
    while (!executor.isTerminated()) {
        System.out.println("Executor not yet terminated");
        System.out.flush();
        pause(4);
    }
    System.out.println("final atomicInt=" + atomicInteger.get());
}

public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {

    }
}
0 голосов
/ 14 ноября 2018

Вы должны просто изменить цикл while, чтобы проверить, в каком состоянии вы нуждаетесь, и после этого завершить работу исполнителя

...