Я работаю над улучшением для существующего приложения Java. Приложение представляет собой процессор сообщений, который ежедневно обрабатывает несколько миллионов сообщений. Он написан с использованием Core Java с потоками, а очереди реализованы с использованием классов Collection.
В этом приложении некоторые типы сообщений выполняются в одном потоке. Передо мной была поставлена задача сделать эту конкретную часть приложения многопоточной, чтобы быстрее обрабатывать сообщения, поскольку у нас есть два процессора.
Поскольку мы используем Java 5, я использовал подход ThreadPoolExcecutor. Я создал процессорные потоки для каждого клиента, чтобы сообщения для определенных потоков могли обрабатываться в своем собственном потоке. Потоки процессора реализуют интерфейс Callable, поскольку это позволит мне проверить будущий объект, завершено ли предыдущее задание или нет.
В процессе инициализации я перейду по всем клиентам, создаю потоки процессора для каждого и сохраню их в карте, используя их идентификатор в качестве уникального ключа. Чтобы отслеживать ранее отправленную работу, я снова сохраняю будущий объект на другой карте, используя тот же идентификатор, что и уникальный ключ.
Ниже приведен фрагмент кода, который я использовал: В основном классе -
ThreadPoolExecutor threadPool = null;
int poolSize = 20;
int maxPoolSize = 50;
long keepAliveTime = 10;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000);
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,keepAliveTime, TimeUnit.SECONDS, queue);
....
....
for (each client...) {
id = getId()..
future = futuremap.get(id);
if(!future.isDone())
continue;
if(future == null || future.isDone()) {
processor = processormap.get(id);
if(processor == null) {
processor = new Processor(.....);
//add to the map
processormap.put(id,processor);
}
//submit the processor
future = threadPool.submit(processor );
futuremap.put(id,future);
}
}
Процессорная нить
public class MyProcessor implements Callable<String> {
.....
.....
public String call() {
....
....
}
}
Выпуск
Приведенная выше реализация работает хорошо в моей тестовой среде. Однако в производственной среде ( Edit # 1 - Ubuntu , Linux Slackware, Java - 1.6.0_18) мы заметили, что другие потоки приложения, которые не управляются с помощью этого нового ThreadpoolExecutor страдают. их задачи откладываются на несколько часов. Это потому, что потоки, созданные ThreadPoolExecutors, берут все ресурсы или что-то еще и не дают возможности другим потокам.
Новые потоки, созданные с помощью ThreadPoolExceutor, выполняют самостоятельную задачу и не конфликтуют с другими потоками за ресурсы. то есть, нет сценария состояния гонки.
В журнале, для новых потоков, я вижу, что работает максимум 20 потоков (corepoolsize), и нет никаких исключений отклонения, т. Е. Количество отправок находится в пределах очереди.
Есть идеи, почему это происходит?
Спасибо заранее.