Как использовать параллельную обработку наиболее эффективным и элегантным способом в Java - PullRequest
4 голосов
/ 07 июня 2019

У меня есть разные источники данных, из которых я хочу запрашивать параллельно (так как каждый из этих запросов является HTTP-вызовом и может занимать довольно много времени).Но я собираюсь использовать только 1 ответ на эти запросы.Так что я как бы расставил приоритеты.Если первый ответ недействителен, я проверю второй.Если он также недействителен, я хочу использовать третий и т. Д. Но я хочу прекратить обработку и вернуть результат, как только получу первый правильный ответ.

Чтобы смоделировать проблему, я создал следующий код, где я пытаюсь использовать параллельную потоковую передачу Java.Но проблема в том, что я получаю окончательные результаты только после обработки всех запросов.

public class ParallelExecution {

    private static Supplier<Optional<Integer>> testMethod(String strInt) {
        return () -> {
            Optional<Integer> result = Optional.empty();
            try {
                result = Optional.of(Integer.valueOf(strInt));
                System.out.printf("converted string %s to int %d\n",
                        strInt,
                        result.orElse(null));
            } catch (NumberFormatException ex) {
                System.out.printf("CANNOT CONVERT %s to int\n", strInt);
            }

            try {
                int randomValue = result.orElse(10000);
                TimeUnit.MILLISECONDS.sleep(randomValue);
                System.out.printf("converted string %s to int %d in %d milliseconds\n",
                        strInt,
                        result.orElse(null), randomValue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        };
    }

    public static void main(String[] args) {
        Instant start = Instant.now();
        System.out.println("Starting program: " + start.toString());
        List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
        for (String arg: args) {
            listOfFunctions.add(testMethod(arg));
        }
        Integer value = listOfFunctions.parallelStream()
                .map(function -> function.get())
                .filter(optValue -> optValue.isPresent()).map(val-> {
                    System.out.println("************** VAL: " + val);
                    return val;
                }).findFirst().orElse(null).get();
        Instant end = Instant.now();
        Long diff = end.toEpochMilli() - start.toEpochMilli();
        System.out.println("final value:" + value + ", worked during " + diff + "ms");
    }
}

Поэтому, когда я выполняю программу, используя следующую команду:

$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767

Я хочу получить результат«34» как можно скорее (примерно через 34 миллисекунды), но на самом деле я жду более 10 секунд.

Не могли бы вы помочь найти наиболее эффективное решение этой проблемы?

Ответы [ 3 ]

2 голосов
/ 07 июня 2019

ExecutorService#invokeAny выглядит хорошим вариантом.

List<Callable<Optional<Integer>>> tasks = listOfFunctions
    .stream()
    .<Callable<Optional<Integer>>>map(f -> f::get)
    .collect(Collectors.toList());

ExecutorService service = Executors.newCachedThreadPool();
Optional<Integer> value = service.invokeAny(tasks);

service.shutdown();

Я преобразовал ваш List<Supplier<Optional<Integer>>> в List<Callable<Optional<Integer>>>, чтобы можно было передать его в invokeAny. Вы можете построить Callable с изначально. Затем я создал ExecutorService и отправил задачи.

Результат первой успешно выполненной задачи будет возвращен, как только этот результат будет возвращен из задачи. Другие задачи будут прерваны.

Вы также можете посмотреть CompletionService.

List<Callable<Optional<Integer>>> tasks = Arrays
    .stream(args)
    .<Callable<Optional<Integer>>>map(arg -> () -> testMethod(arg).get())
    .collect(Collectors.toList());

final ExecutorService underlyingService = Executors.newCachedThreadPool();
final ExecutorCompletionService<Optional<Integer>> service = new ExecutorCompletionService<>(underlyingService);
tasks.forEach(service::submit);

Optional<Integer> value = service.take().get();
underlyingService.shutdownNow();
0 голосов
/ 07 июня 2019

Я пробовал это с помощью CompettableFutures и методом anyOf.Он вернется, когда будет завершено любое будущее.Теперь ключом к остановке других задач является предоставление собственного сервиса исполнителя для completetableFuture (s) и его выключение при необходимости.

  public static void main(String[] args) {
    Instant start = Instant.now();
    System.out.println("Starting program: " + start.toString());
    CompletableFuture<Optional<Integer>> completableFutures[] = new CompletableFuture[args.length];
    ExecutorService es = Executors.newFixedThreadPool(args.length,r -> {
            Thread t = new Thread(r);
            t.setDaemon(false);
            return t;
    });

    for (int i = 0;i < args.length; i++) {
        completableFutures[i] = CompletableFuture.supplyAsync(testMethod(args[i]),es);
    }
    CompletableFuture.anyOf(completableFutures).
            thenAccept(res-> {
                System.out.println("Result - " + res + ", Time Taken : " + (Instant.now().toEpochMilli()-start.toEpochMilli()));
                es.shutdownNow();
            });
}

PS: он будет генерировать прерванные исключения, которые вы можете игнорировать в try catchблокировать, а не печатать трассировку стека. Кроме того, в идеале размер пула потоков должен быть равен длине массива args.

0 голосов
/ 07 июня 2019

Вы можете использовать очередь, чтобы поместить свои результаты в:

private static void testMethod(String strInt, BlockingQueue<Integer> queue) {
    // your code, but instead of returning anything:
    result.ifPresent(queue::add);
}

, а затем позвоните с

for (String s : args) {
    CompletableFuture.runAsync(() -> testMethod(s, queue));
}
Integer result = queue.take();

Обратите внимание, что это будет обрабатывать только первый результат, как в вашем примере.

...