У меня есть одна программа со странной ошибкой одновременно.
Что эта программа делает:
- Выполнение цикла событий каждый
EVENT_LOOP_PAUSE_DURATION_IN_MS
. - Для каждой задачиВыполнить процессор
TaskProcessor
- Каждый
500 ms
печатает размер очереди моего исполнителя.
Я хочу иметь не более одной задачи в очереди на taskId
.Поэтому, когда я добавляю задачу в очередь, я проверяю, существуют ли уже задачи или нет.Если нет задачи, я добавляю ее.В конце обработки задачи я удаляю задачу из карты activeTasks
.
Если вы запустите программу, то увидите следующий вывод:
ERROR: 50
ERROR: 70
ERROR: 80
ERROR: 90
ERROR: 110
ERROR: 120
ERROR: 120
ERROR: 140
Итак, есть ошибка.Я не знаю почему, но размер очереди пула потоков бесконечно увеличивается.
Вы видите, что я удаляю активные задачи в 2 точках программы:
- В
finally
блоке TaskProcessor
, когда задача обработана. - Я удаляю устаревшие задачи в цикле событий.
Итак, если я удаляю код, который удаляет задачи в точке (2), то ошибка исчезает.Я не понимаю это поведение.
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Program {
private static final int NUMBER_OF_TASKS = 40;
private static final int NUMBER_OF_THREADS = 10;
private static final long EVENT_LOOP_PAUSE_DURATION_IN_MS = 40L;
class QueueSizePrinter extends Thread {
private final LinkedBlockingQueue<Runnable> workQueue;
public QueueSizePrinter(LinkedBlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (true) {
int qSize = workQueue.size();
if (qSize > NUMBER_OF_TASKS) {
System.out.println("ERROR: " + qSize);
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TaskProcessor implements Runnable {
private final String currentTaskId;
private final ConcurrentHashMap<String, Long> activeTasks;
public TaskProcessor(String currentTaskId, ConcurrentHashMap<String, Long> activeTasks) {
this.currentTaskId = currentTaskId;
this.activeTasks = activeTasks;
}
@Override
public void run() {
try {
// emulate of useful work
Thread.sleep(300L);
} catch (Exception e) {
System.out.println("error: " + e.toString());
} finally {
activeTasks.remove(currentTaskId); // (1)
}
}
}
public void program() {
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ExecutorService executor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, 0L, TimeUnit.MILLISECONDS, workQueue);
Set<String> initialTasks = ConcurrentHashMap.newKeySet();
for (int currentTaskIndex = 0; currentTaskIndex < NUMBER_OF_TASKS; currentTaskIndex++) {
initialTasks.add(String.valueOf(currentTaskIndex));
}
new QueueSizePrinter(workQueue).start();
ConcurrentHashMap<String, Long> activeTasks = new ConcurrentHashMap<>();
while (true) {
initialTasks.forEach((currentTaskId) -> {
if (!activeTasks.containsKey(currentTaskId)) {
activeTasks.put(currentTaskId, System.currentTimeMillis());
executor.submit(new TaskProcessor(currentTaskId, activeTasks));
}
});
// (2)
activeTasks.entrySet().removeIf(entry -> {
boolean hasDelete = System.currentTimeMillis() - entry.getValue() > 1000;
if (hasDelete) {
//System.out.println("DELETE id=" + entry.getKey());
}
return hasDelete;
});
try {
Thread.sleep(EVENT_LOOP_PAUSE_DURATION_IN_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Program main = new Program();
main.program();
}
}