Как остановить параллельный поток, как только найдет совпадение - PullRequest
0 голосов
/ 19 сентября 2018

Я пытаюсь найти первого (любого) члена списка, который соответствует данному предикату, например:

Item item = items.parallelStream()
  .map(i -> i.doSomethingExpensive())
  .filter(predicate)
  .findAny()
  .orElse(null);

Я ожидаю, что как только findAny() получит совпадение, он немедленно вернется, но это не так.Вместо этого он, похоже, ждет завершения метода map для большинства элементов, прежде чем вернуться.Как я могу немедленно вернуть первый результат и отменить другие параллельные потоки?Есть ли лучший способ сделать это, чем использование потоков, таких как CompletableFuture?

Вот простой пример, демонстрирующий поведение:

private static void log(String msg) {
    private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " " + msg);
}

Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
  .map(n -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try { Thread.sleep(delay); }
    catch (InterruptedException e) { System.err.println("Interruption error"); }
    return n * n;
  })
  .filter(n -> n < 30)
  .peek(n -> log("Found match: " + n))
  .findAny();

log("First match: " + num);

Вывод журнала:

14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]

Как только совпадение найдено (в данном случае 16), findAny() не возвращается немедленно, а вместо этого блокируется, пока не закончатся оставшиеся потоки.В этом случае вызывающая сторона ждет дополнительные 5 секунд, прежде чем вернуться после того, как совпадение уже найдено.

Ответы [ 3 ]

0 голосов
/ 20 сентября 2018

Вместо этого, кажется, нужно дождаться завершения метода карты для большинства элементов, прежде чем возвращать.

Это не правильно.

Когда мы говорим об элементахкоторые уже обрабатываются, он будет ожидать завершения всех из них , так как Stream API позволяет одновременно обрабатывать структуры данных, которые не являются поточно-ориентированными.Он должен убедиться, что весь потенциальный параллельный доступ был завершен, прежде чем вернуться из операции терминала.

Говоря обо всем потоке, просто нечестно тестировать поток из 14 элементов на 8-ядерном компьютере.Конечно, будет запущено как минимум 8 одновременных операций, вот в чем суть.Вы добавляете топливо в огонь, используя findFirst() вместо findAny(), так как это не означает, что вы возвращаете первый найденный элемент в порядке обработки, но первый элемент в порядке встречи, т. Е. Точно ноль в вашем примере, поэтому потокиОбработка других фрагментов, отличных от первого, не может предположить, что их результат является правильным ответом, и они даже больше готовы помочь в обработке других кандидатов, чем с помощью findAny().

Когда вы используете

List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
        .map(n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        })
        .filter(n -> n < 40_000)
        .peek(n -> log("Found match: " + n))
        .findAny();

log("First match: " + num);

Вы получите такое же количество задач, которые будут выполнены до завершения, несмотря на гораздо большее количество элементов потока.

Обратите внимание, что CompletableFuture также не поддерживает прерывание, поэтому единственная встроенная функция для возврата любого результата иОтмена других заданий, которые мне приходят в голову, - это старый ExecutorService.invokeAny.

. Для создания функции сопоставления и фильтрации для него мы можем использовать следующую вспомогательную функцию:

static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
    return () -> {
        R r = f.apply(t);
        if(!p.test(r)) throw new NoSuchElementException();
        return r;
    };
}

К сожалению, есть только возможность дополнить значением или в порядке исключения, поэтому мы должны использовать исключение fили несовпадающие элементы.

Тогда мы можем использовать его как

ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
    .mapToObj(i -> mapAndfilter(i,
        n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        },
        n -> n < 10_000))
    .collect(Collectors.toList()));

log("result: "+result);

, и он не только отменит отложенные задачи, но и вернется, не дожидаясь их завершения.

Конечно, это означает, что исходные данные, на которых выполняются задания, должны быть либо неизменяемыми, либо поточно-ориентированными.

0 голосов
/ 20 сентября 2018

Здесь есть несколько вещей.Во-первых, parallelStream() по умолчанию использует общий ForkJoinPool, что также позволяет вызывающему потоку участвовать.Это означает, что если в вызывающем потоке в настоящий момент выполняется одна из медленных задач, она должна завершиться, прежде чем вызывающая сторона вернет элемент управления.

Это можно увидеть, немного изменив код для регистрации потокаимена и журнал, когда закончите wating:

private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + " " + msg);
}

public static void main(String[] args) {
    Random random = new Random();
    List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
    Optional<Integer> num = nums.parallelStream()
            .map(n -> {
                long delay = Math.abs(random.nextLong()) % 10000;
                log("Waiting on " + n + " for " + delay + " ms");
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    System.err.println("Interruption error");
                }
                log("finished waiting");
                return n * n;
            })
            .filter(n -> n < 30)
            .peek(n -> log("Found match: " + n))
            .findAny();

    log("First match: " + num);
}

Пример вывода:

13:56:52.954 [main]  Waiting on 9 for 9936 ms
13:56:52.956 [ForkJoinPool.commonPool-worker-1]  Waiting on 4 for 7436 ms
13:56:52.970 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 6523 ms
13:56:52.983 [ForkJoinPool.commonPool-worker-3]  Waiting on 6 for 7488 ms
13:56:59.494 [ForkJoinPool.commonPool-worker-2]  finished waiting
13:56:59.496 [ForkJoinPool.commonPool-worker-2]  Found match: 1
13:57:00.392 [ForkJoinPool.commonPool-worker-1]  finished waiting
13:57:00.392 [ForkJoinPool.commonPool-worker-1]  Found match: 16
13:57:00.471 [ForkJoinPool.commonPool-worker-3]  finished waiting
13:57:02.892 [main]  finished waiting
13:57:02.894 [main]  First match: Optional[1]

Здесь, как вы можете видеть, найдено 2 совпадения, но основной поток все еще занят, так чтоне может вернуть совпадение сейчас.

Это не всегда объясняет все случаи, хотя:

13:58:52.116 [main]  Waiting on 9 for 5256 ms
13:58:52.143 [ForkJoinPool.commonPool-worker-1]  Waiting on 4 for 4220 ms
13:58:52.148 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 2136 ms
13:58:52.158 [ForkJoinPool.commonPool-worker-3]  Waiting on 6 for 7262 ms
13:58:54.294 [ForkJoinPool.commonPool-worker-2]  finished waiting
13:58:54.295 [ForkJoinPool.commonPool-worker-2]  Found match: 1
13:58:56.364 [ForkJoinPool.commonPool-worker-1]  finished waiting
13:58:56.364 [ForkJoinPool.commonPool-worker-1]  Found match: 16
13:58:57.399 [main]  finished waiting
13:58:59.422 [ForkJoinPool.commonPool-worker-3]  finished waiting
13:58:59.424 [main]  First match: Optional[1]

Это может быть объяснено тем, как пул объединения вилок объединяет результаты.Кажется, что некоторые улучшения возможны.

В качестве альтернативы, вы действительно могли бы сделать это, используя CompletableFuture:

// you should probably also pass your own executor to supplyAsync()
List<CompletableFuture<Integer>> futures = nums.stream().map(n -> CompletableFuture.supplyAsync(() -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        System.err.println("Interruption error");
    }
    log("finished waiting");
    return n * n;
})).collect(Collectors.toList());
CompletableFuture<Integer> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(unused -> futures.stream().map(CompletableFuture::join).filter(n -> n < 30).findAny().orElse(null));
// shortcircuiting
futures.forEach(f -> f.thenAccept(r -> {
    if (r < 30) {
        log("Found match: " + r);
        result.complete(r);
    }
}));
// cancelling remaining tasks
result.whenComplete((r, t) -> futures.forEach(f -> f.cancel(true)));

log("First match: " + result.join());

Выход:

14:57:39.815 [ForkJoinPool.commonPool-worker-1]  Waiting on 0 for 7964 ms
14:57:39.815 [ForkJoinPool.commonPool-worker-3]  Waiting on 2 for 5743 ms
14:57:39.817 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 9179 ms
14:57:45.562 [ForkJoinPool.commonPool-worker-3]  finished waiting
14:57:45.563 [ForkJoinPool.commonPool-worker-3]  Found match: 4
14:57:45.564 [ForkJoinPool.commonPool-worker-3]  Waiting on 3 for 7320 ms
14:57:45.566 [main]  First match: 4

Обратите внимание, чтоcancel(true) на самом деле не отменяет текущие задачи (например, прерывание не произойдет), но предотвращает запуск дальнейших задач (вы даже можете видеть, что это может быть не сразу, поскольку работник 3 все еще начал выполнять следующую).

Вы также должны использовать своего собственного исполнителя, с соответствующим размером, зависящим от того, является ли он более интенсивным использованием ЦП или ввода-вывода.Как вы можете видеть, по умолчанию используется общий пул, и поэтому он не использует все ядра.

allOf() в основном необходим, если совпадение не найдено.Если вы можете гарантировать, что есть хотя бы одно совпадение, вы можете просто использовать `new CompletableFuture () вместо этого.

Наконец, в качестве простого подхода я повторил проверку filter, но ее легко переместитьЛогика внутри основной логики, вернуть null или маркер, а затем проверить это в обоих местах.

См. также Как создать будущее, которое будет завершено, когда будет завершено любое из заданных CompletableFuturesс результатом, который соответствует определенному предикату?

0 голосов
/ 19 сентября 2018

Вы можете использовать этот код, чтобы проиллюстрировать, как работает parallelStream:

final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");

    String result = list.parallelStream()
                        .map(s -> {
                            System.out.println("map: " + s);
                            return s;
                        })
                        .filter(s -> {
                            System.out.println("fiter: " + s);
                            return s.equals("8th");
                        })
                        .findFirst()
                        .orElse(null);

    System.out.println("result=" + result);

Есть два варианта для достижения того, что вы ищете, чтобы остановить дорогостоящую работу с фильтром:

  1. Ни в коем случае не используйте потоки, используйте простой для или расширенный для
  2. Сначала отфильтруйте, затем сопоставьте с дорогой операцией
...