Какая одновременно ошибка в этой программе? - PullRequest
0 голосов
/ 24 февраля 2019

У меня есть одна программа со странной ошибкой одновременно.

Что эта программа делает:

  1. Выполнение цикла событий каждый EVENT_LOOP_PAUSE_DURATION_IN_MS.
  2. Для каждой задачиВыполнить процессор TaskProcessor
  3. Каждый 500 ms печатает размер очереди моего исполнителя.

Я хочу иметь не более одной задачи в очереди на taskId.Поэтому, когда я добавляю задачу в очередь, я проверяю, существуют ли уже задачи или нет.Если нет задачи, я добавляю ее.В конце обработки задачи я удаляю задачу из карты activeTasks.

Если вы запустите программу, то увидите следующий вывод:

ERROR: 50
ERROR: 70
ERROR: 80
ERROR: 90
ERROR: 110
ERROR: 120
ERROR: 120
ERROR: 140

Итак, есть ошибка.Я не знаю почему, но размер очереди пула потоков бесконечно увеличивается.

Вы видите, что я удаляю активные задачи в 2 точках программы:

  1. В finally блоке TaskProcessor, когда задача обработана.
  2. Я удаляю устаревшие задачи в цикле событий.

Итак, если я удаляю код, который удаляет задачи в точке (2), то ошибка исчезает.Я не понимаю это поведение.

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Program {

    private static final int NUMBER_OF_TASKS = 40;
    private static final int NUMBER_OF_THREADS = 10;
    private static final long EVENT_LOOP_PAUSE_DURATION_IN_MS = 40L;

    class QueueSizePrinter extends Thread {

        private final LinkedBlockingQueue<Runnable> workQueue;

        public QueueSizePrinter(LinkedBlockingQueue<Runnable> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (true) {
                int qSize = workQueue.size();
                if (qSize > NUMBER_OF_TASKS) {
                    System.out.println("ERROR: " + qSize);
                }

                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class TaskProcessor implements Runnable {
        private final String currentTaskId;
        private final ConcurrentHashMap<String, Long> activeTasks;

        public TaskProcessor(String currentTaskId, ConcurrentHashMap<String, Long> activeTasks) {
            this.currentTaskId = currentTaskId;
            this.activeTasks = activeTasks;
        }

        @Override
        public void run() {
            try {
                // emulate of useful work
                Thread.sleep(300L);
            } catch (Exception e) {
                System.out.println("error: " + e.toString());
            } finally {
                activeTasks.remove(currentTaskId); // (1)
            }
        }
    }

    public void program() {

        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ExecutorService executor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, 0L, TimeUnit.MILLISECONDS, workQueue);

        Set<String> initialTasks = ConcurrentHashMap.newKeySet();
        for (int currentTaskIndex = 0; currentTaskIndex < NUMBER_OF_TASKS; currentTaskIndex++) {
            initialTasks.add(String.valueOf(currentTaskIndex));
        }

        new QueueSizePrinter(workQueue).start();

        ConcurrentHashMap<String, Long> activeTasks = new ConcurrentHashMap<>();

        while (true) {

            initialTasks.forEach((currentTaskId) -> {
                if (!activeTasks.containsKey(currentTaskId)) {
                    activeTasks.put(currentTaskId, System.currentTimeMillis());

                    executor.submit(new TaskProcessor(currentTaskId, activeTasks));
                }
            });

            // (2)
            activeTasks.entrySet().removeIf(entry -> {
                boolean hasDelete = System.currentTimeMillis() - entry.getValue() > 1000;
                if (hasDelete) {
                    //System.out.println("DELETE id=" + entry.getKey());
                }
                return hasDelete;
            });

            try {
                Thread.sleep(EVENT_LOOP_PAUSE_DURATION_IN_MS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Program main = new Program();
        main.program();
    }
}

1 Ответ

0 голосов
/ 24 февраля 2019

Проблема в точке (2), вы удаляете устаревшие задачи из карты активных заданий.Но они все еще передаются в ExecutorService.Поскольку Вы удалили его из карты, когда цикл while выполнит другой цикл, эта же задача будет повторно отправлена ​​в ExecutorService.Это приводит к увеличению номеров задач.

...