Метод Run на экземпляре Java Runnable, не вызванный после ExecutorService.submit - PullRequest
0 голосов
/ 15 мая 2019

У меня есть программа, в которой я инициализировал пул потоков, используя ExecutorService. Из основных потоков приложения я создаю работающий экземпляр с некоторыми данными и передаю его в пул потоков, используя executorservice.submit(), один из потоков из пула берет данные и отправляет их в Kafka. Тот же код отлично работает на других компьютерах (Машина 1: AIX - внутри WebSphere Application Server, Машина 2: Windows 10, автономная Java JVM). Когда я запускаю тот же код в автономной JVM в AIX, он не работает (нет журналов из Runnable и данные также не публикуются в Kafka.

Классы:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;



public class CustomExecutorServiceWrapper {



private static ExecutorService[] executorArr =  {Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1),Executors.newFixedThreadPool(1) } ;

public static void submitToSpecificThread(Runnable r) {
        int i = r.hashCode()%10;
    ExecutorService executorService = executorArr[Math.abs(i)];
    ThreadPoolExecutor tPoolExecutor = (ThreadPoolExecutor)executorService;
     tPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("Task rejected" );

        }
    });
    System.out.println("Thread Name - "+Thread.currentThread().getId() +" " +  Thread.currentThread().getName() + " queue size " +  tPoolExecutor.getQueue().size() );
    Future fObj = executorService.submit(r);
    System.out.println("FutureObject Cancelled" + fObj.isCancelled());
    System.out.println("FutureObject isDone" + fObj.isDone());

    System.out.println("Thread Name - "+Thread.currentThread().getId() +" " +  Thread.currentThread().getName() + " after submit queue size " +
            tPoolExecutor.getQueue().size() );

}
}

Класс - 2:


import com.common.kafka.StaticKafkaBaseSenderImpl;

public class CustomRunnable implements Runnable{

    @Override
    public int hashCode() {
        return strWorkLoadIdentifier.hashCode();
    }

    String strWorkLoadIdentifier ;

    String strLoadMessage ;

    private void log(String str) {
        System.out.println("CustomRunnable - "+ Thread.currentThread().getId()+" - "+Thread.currentThread().getName() + " - " + str);
        }   
    public CustomRunnable(String str) {
        this.strWorkLoadIdentifier=str;
    }
    public CustomRunnable(String str, String strMessage) {
        this.strWorkLoadIdentifier=str;
        this.strLoadMessage=strMessage;
    }

    @Override
    public void run() {
        log("Starting work Load" + strWorkLoadIdentifier);      
        try {
            long t1=System.currentTimeMillis();
            StaticKafkaBaseSenderImpl.doBackgroundStream(strLoadMessage);
            log("Time taken to post message - " + (System.currentTimeMillis() - t1));
        } catch (Exception e) {
            log("Error Sending Message to Kafka");
            e.printStackTrace();
        }
    }
}

Класс 3: StaticKafkaBaseSenderImpl -> отправляет сообщение в тему Kafka.

Поток приложений:

  1. Главный поток приложения собирает некоторые данные.
  2. Главный поток приложения выполняет следующую инструкцию:

    CustomExecutorServiceWrapper.submitToSpecificThread (новый CustomRunnable («Случайный идентификатор сообщения», «Собранные данные»);

  3. Главный поток приложения продолжает выполнение.

В фоновом режиме внутри метода submitToSpecificThread идентифицируется конкретный экземпляр ExecutorService, и ему передается задача.

Журналы от запуска приложения:

2019-05-15 14:30:58,595:INFO   :Thread-31_ApplicationProcess: Thread Name - 69 Thread-31_ApplicationProcess queue size 0 [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO   :Thread-31_ApplicationProcess: FutureObject Cancelledfalse                                  [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO   :Thread-31_ApplicationProcess: FutureObject isDonefalse                                     [system]: CustomExecutorServiceWrapper
2019-05-15 14:30:58,595:INFO   :Thread-31_ApplicationProcess: Thread Name - 69 Thread-31_ApplicationProcess after submit queue size 0 [system]: CustomExecutorServiceWrapper

Я добавил проверку на объект будущего, чтобы увидеть, отклоняется ли он или отменяется, но я не вижу журналы от RejectedExecutionHandler, а isCancelled() и isDone() оба возвращают false.

...