ExecutorService - параллельное выполнение задач и сохранение результатов - PullRequest
2 голосов
/ 27 января 2020

Я хочу отправить пинг для 10 пользователей одновременно и обновить пользовательский объект с результатом, как только пинг завершен.

Для этого я пытаюсь использовать ExecutorService .

Я начал с такого кода:

private void pingUsers(List<User> userList) throws ExecutionException, InterruptedException {
    final int NUM_THREADS = 10;
    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

    for (User user : userList) {
        SnmpPingDevice pingUser = new PingUser(user);
        Future<Boolean> isUserActive = executor.submit(pingUser);
        user.isActive = isUserActive.get() ; // -- I guess it will block other pings and i'm back to my starting point where I need to run the pings in parallel.
    }

    executor.shutdown();
    try {
        executor.awaitTermination(30, TimeUnit.SECONDS);


    } catch (InterruptedException e) {
        logger.error("Failed to terminate executor");
    }
}

Вот так выглядит мой класс PingUser:

@Override
    public Boolean call() {
        ping = new CmdRunner(toolDir,outDir,
                new UserOidWorkerPing(version,community,ip,logger));

        return this.isActive();
    }

public boolean isActive(){
        String cmd = ping.getCmdNoRedirect(); 
        String rc = this.cmdRunner.runShellCmd(cmd,this.outDir +"/dummy",false);
        logger.debug("PING was sent with cmd:" + cmd + ",rc:" + rc);
        return rc != null && !rc.contains("Timeout:") && !rc.isEmpty();
    }

И Вернемся к той же проблеме: пинги не будут выполняться параллельно (как только l oop ожидает окончания isUserActive.get())

Есть идеи, что мне не хватает? Как я могу заставить эти эхо-запросы выполняться параллельно и сохранить результат для каждого пользователя в моем List<User> userList?

Ответы [ 3 ]

3 голосов
/ 27 января 2020

Future::get является операцией блокировки, поэтому вызывающий поток будет заблокирован до завершения вызова. Таким образом, вы отправляете новую задачу только после того, как предыдущая была закончена.

Рассмотрите возможность использования ExecutorService::invokeAll, которая вернет список Future s:

List<PingUser> pingUsers = userList.stream().map(PingUser::new).collect(Collectors.toList());
List<Future<Boolean>> results = executor.invokeAll(pingUsers);
2 голосов
/ 27 января 2020

Вы блокируете свое выполнение для каждого вызова с помощью этой строки:

user.isActive = isUserActive.get() ;

Это эффективно ожидает завершения вызова и делает это для каждого вызова, включенного в время.

Вы должны отправить все задачи и составить список из Future с, чтобы ждать результатов только после того, как все задачи были отправлены. Как то так:

List<Future<Boolean>> tasks = new ArrayList<>();
for (User user : userList) {
    SnmpPingDevice pingUser = new PingUser(user);
    tasks.add(executor.submit(pingUser));
}

for(Future<Boolean> task: tasks) {
    //use the result... OK to get() here.
}
0 голосов
/ 27 января 2020

Что вы можете сделать, это добавить user.isActive = future.get() в Runnable, который вы отправляете.

for (User user : userList) {
    SnmpPingDevice pingUser = new PingUser(user);
    executor.submit(() -> user.isActive = pingUser.call());
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...