передача параметра в threadpoolexecutor - PullRequest
0 голосов
/ 22 февраля 2019

Я рассмотрел многопоточность и попытался реализовать приложение, которое создает отдельные потоки для запуска процесса сбора.Основной метод в этом процессе требует переменной arraylist, и я пытаюсь найти способ передачи массива в каждый поток.

ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("taskExecutor");

    MainTask mxTask = (MainTask) context.getBean("MainTask");
    mxTask.setName("Thread 1");
    taskExecutor.execute(mxTask);

    MainTask mxTask2 = (MainTask) context.getBean("MainTask");
    mxTask2.setName("Thread 2");
    taskExecutor.execute(mxTask2);

Выше приведен метод класса, который объявляет потоки с MainTask каккласс, который выполняет метод run (), который затем вызывает другие основные методы.

@Override
public void run() {
    System.out.println(name + " is running.");
    getConfigurations();
    try {
            mainRun();


    } catch (MessagingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

Метод getconfigurations () считывает все необходимые переменные конфигурации.Ниже приведен основной процесс, который должен выполняться каждым потоком.

public static void mainRun() throws IOException {

    ArrayList<String> filterList = new ArrayList<String>();
    ArrayList<String> brokerList = new ArrayList<String>();


    if(kafkaServers1 != null) {
        brokerList.add(kafkaServers1);
    } else if(!kafkaServers2.isEmpty()) {
        brokerList.add(kafkaServers2);
    } else if(!kafkaServers3.isEmpty()) {
        brokerList.add(kafkaServers3);
    }

    ArrayList<String> serverList = new ArrayList<String>();

    for(int x=0;x<brokerList.size();x++){
        String[] serverBrokers = brokerList.get(x).split(",");
        serverList.add(serverBrokers[0]);
        serverList.add(serverBrokers[1]);
        serverList.add(serverBrokers[2]);

    }


    try {

        while(true){
            for (String temp : serverList) {
                kafkaServer = temp;
                hostName = kafkaServer;
                InetAddress addr = InetAddress.getByName(hostName);
                hostName = addr.getHostName();
                kafkaServer= hostName;
                retrieveData(hostName);

...

Переменная kafkaServers1 содержит список из 3 ips, которые разделяются и каждый добавляется в массив serverList arrayList.То, что я пытаюсь сделать, это назначить только один IP для каждого потока.Это возможно?Может кто-нибудь посоветовать, пожалуйста?

1 Ответ

0 голосов
/ 22 февраля 2019

Вместо того, чтобы ваш объект Runnable был бином, вы можете извлечь конфигурации Kafka, а затем создать новый Runnable для каждого хоста.Отправить его исполнителю.

MyRunnable infoGetter = new MyRunnable(hostInfo);
taskExecutor.execute(infoGetter);

Возможно, лучшим вариантом будет CompletableFuture, если вам нужно что-то сделать с результатами, или разветвление / соединение RecursiveTask<V>, если вам нужно объединить их.

...