Частичное извлечение данных из пакета задач - PullRequest
0 голосов
/ 25 апреля 2018

Я использую ExecutorService для отправки пакета задач. Я делаю это примерно так:

ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
List<ListenableFuture<Whatever>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
  results.add(exec.submit(new MyTask(i)));
}

ListenableFuture<List<Whatever>> listListenableFuture = Futures.successfulAsList(futures);

try {
    List<Whatever> responses = listListenableFuture.get(2000, TimeUnit.MILLISECONDS);
    for (Whatever response : responses) {
      LOG.info("Yay!");
    }
} catch (TimeoutException e) {
  LOG.info("Timeout Exception");
} catch (Exception e) {
  // Nay! 
}

Проблема здесь в том, что если одна из задач занимает более 2000 мс, она выдаст TimeoutException, и я ничего не получу в ответе, хотя некоторые задачи могли быть завершены в этот самый момент.

Поэтому я хочу получить ответ (будь то частичный или полный) о задачах, которые были выполнены до истечения времени ожидания (2000 мс). Например:

(время относительно START_TIME пакетного вызова)
Задача-1: 1000 мс
Задача-2: 3000 мс
Задача-3: 1800 мс


Выход:
Исключение тайм-аута


Требуемый выход:
Ура! <- соответствует заданию-1 <br> Ура! <- соответствует заданию-3 </p>

Единственное решение, о котором я подумал, - это выбрать фьючерсы по отдельности и установить для них время ожидания MAX(0, TIME_OUT - TIME_NOW - START_TIME). Это может сработать, но не кажется мне чистым решением.

Ответы [ 2 ]

0 голосов
/ 26 апреля 2018

Если вы используете Futures.getChecked, исключения тайм-аута будут проглочены, и в будущем будет возвращен ноль. Проверьте следующий код, где одна из задач выдает TimeoutException, а соответствующее будущее возвращает null.

import java.io.IOException;

import com.google.common.util.concurrent.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class App 
{
    public static void main( String[] args ) throws InterruptedException, ExecutionException
    {
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
        ListenableFuture<String> future1 = 
            listeningExecutorService.submit(() -> {
                throw new TimeoutException("Timeout exception");
            });
        ListenableFuture<String> future2 = 
            listeningExecutorService.submit(() -> "Hello World");
        ListenableFuture<List<String>> combined = Futures.successfulAsList(future1, future2);
        try {
            String greeting = Futures.getChecked(combined, IOException.class, 2000l, TimeUnit.MILLISECONDS).stream().collect(Collectors.joining(" "));
            System.out.println(greeting);
        } catch (IOException e) {
            System.out.println("Exception: " + e.getMessage());
        } finally {
            listeningExecutorService.shutdown();
        }
    }
}
0 голосов
/ 25 апреля 2018

Вы можете использовать вызываемую декорацию, которая обрабатывает время ожидания.

Предположим, что это оригинальный вызываемый код:

class OriginalCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "";
    }
}

Вы можете создать декорацию, вызываемую с помощью этого оригинального вызова и исполнителя:

class DecorateCallable implements Callable<String> {
    ExecutorService executorService;
    OriginalCallable callable;

    public DecorateCallable(ExecutorService executorService, OriginalCallable callable) {
        this.executorService = executorService;
        this.callable = callable;
    }

    @Override
    public String call() throws Exception {
        Future<String> future = executorService.submit(callable);
        try {
            return future.get(2000, TimeUnit.SECONDS);
        } catch (TimeoutException | InterruptedException e) {

        }
        return null;
    }
}

Если вы решили использовать это, вам нужно удвоить размер пула:

Executors.newFixedThreadPool(threads * 2);

и добавьте некоторые условия, например if(future.get() != null), перед тем, как поместить их в окончательный набор результатов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...