У меня есть программа, в которой я инициализировал пул потоков, используя ExecutorService. Из основных потоков приложения я создаю работающий экземпляр с некоторыми данными и передаю его в пул потоков, используя executorservice.submit()
, один из потоков из пула берет данные и отправляет их в Kafka. Тот же код отлично работает на других компьютерах (Машина 1: AIX - внутри WebSphere Application Server, Машина 2: Windows 10, автономная Java JVM).
Когда я запускаю тот же код в автономной JVM в AIX, он не работает (нет журналов из Runnable и данные также не публикуются в Kafka.
Классы:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomExecutorServiceWrapper {
private static ExecutorService[] executorArr = {Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1) } ;
public static void submitToSpecificThread(Runnable r) {
int i = r.hashCode()%10;
ExecutorService executorService = executorArr[Math.abs(i)];
ThreadPoolExecutor tPoolExecutor = (ThreadPoolExecutor)executorService;
tPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task rejected" );
}
});
System.out.println("Thread Name - "+Thread.currentThread().getId() +" " + Thread.currentThread().getName() + " queue size " + tPoolExecutor.getQueue().size() );
Future fObj = executorService.submit(r);
System.out.println("FutureObject Cancelled" + fObj.isCancelled());
System.out.println("FutureObject isDone" + fObj.isDone());
System.out.println("Thread Name - "+Thread.currentThread().getId() +" " + Thread.currentThread().getName() + " after submit queue size " +
tPoolExecutor.getQueue().size() );
}
}
Класс - 2:
import com.common.kafka.StaticKafkaBaseSenderImpl;
public class CustomRunnable implements Runnable{
@Override
public int hashCode() {
return strWorkLoadIdentifier.hashCode();
}
String strWorkLoadIdentifier ;
String strLoadMessage ;
private void log(String str) {
System.out.println("CustomRunnable - "+ Thread.currentThread().getId()+" - "+Thread.currentThread().getName() + " - " + str);
}
public CustomRunnable(String str) {
this.strWorkLoadIdentifier=str;
}
public CustomRunnable(String str, String strMessage) {
this.strWorkLoadIdentifier=str;
this.strLoadMessage=strMessage;
}
@Override
public void run() {
log("Starting work Load" + strWorkLoadIdentifier);
try {
long t1=System.currentTimeMillis();
StaticKafkaBaseSenderImpl.doBackgroundStream(strLoadMessage);
log("Time taken to post message - " + (System.currentTimeMillis() - t1));
} catch (Exception e) {
log("Error Sending Message to Kafka");
e.printStackTrace();
}
}
}
Класс 3: StaticKafkaBaseSenderImpl -> отправляет сообщение в тему Kafka.
Поток приложений:
- Главный поток приложения собирает некоторые данные.
Главный поток приложения выполняет следующую инструкцию:
CustomExecutorServiceWrapper.submitToSpecificThread (новый CustomRunnable («Случайный идентификатор сообщения», «Собранные данные»);
Главный поток приложения продолжает выполнение.
В фоновом режиме внутри метода submitToSpecificThread идентифицируется конкретный экземпляр ExecutorService, и ему передается задача.
Журналы от запуска приложения:
2019-05-15 14:30:58,595:INFO :Thread-31_ApplicationProcess: Thread Name - 69 Thread-31_ApplicationProcess queue size 0 [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO :Thread-31_ApplicationProcess: FutureObject Cancelledfalse [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO :Thread-31_ApplicationProcess: FutureObject isDonefalse [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO :Thread-31_ApplicationProcess: Thread Name - 69 Thread-31_ApplicationProcess after submit queue size 0 [system]: CustomExecutorServiceWrapper
Я добавил проверку на объект будущего, чтобы увидеть, отклоняется ли он или отменяется, но я не вижу журналы от RejectedExecutionHandler
, а isCancelled()
и isDone()
оба возвращают false.