Цепь CompletableFuture и остановка на первом успехе - PullRequest
0 голосов
/ 24 мая 2018

Я использую API, который возвращает CompletableFuture s для запроса устройств (аналогично digitalpetri modbus ).

Мне нужно вызвать этот API с несколькими вариантами запросаустройство и выяснить, что это такое - это в основном методом проб и ошибок, пока не получится.Это протоколы встроенных устройств, которые я не могу изменить, но вы можете думать, что процесс работает примерно так:

  1. Вы яблоко?
  2. Если нет, то выананас?
  3. Если нет, то вы ручка?
  4. ...

Хотя API использует фьючерсы, на самом деле коммуникации являются последовательными (проходя через один и тот же физический кусок провода), поэтому они никогда не будут выполняться синхронно.Как только я знаю, что это такое, я хочу прекратить попытки и сообщить вызывающей стороне, что это такое.

Я уже знаю, что могу получить результат только одного из фьючерсов с any (см. ниже), но это может привести к дополнительным попыткам, которых следует избегать.

Существует ли схема цепочки фьючерсов, в которой вы останавливаетесь, когда один из них завершается успешно?

Аналогично, но расточительноочень ограниченные ресурсы.

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "attempt 1"),
    CompletableFuture.supplyAsync(() -> "attempt 2"),
    CompletableFuture.supplyAsync(() -> "attempt 3"));

CompletableFuture<String>[] futuresArray = (CompletableFuture<String>[]) futures.toArray();
CompletableFuture<Object> c = CompletableFuture.anyOf(futuresArray);

Ответы [ 2 ]

0 голосов
/ 25 мая 2018

Предположим, что у вас есть метод, который является "псевдо-асинхронным", как вы описываете, то есть он имеет асинхронный API, но требует некоторой блокировки для выполнения:

private final static Object lock = new Object();

private static CompletableFuture<Boolean> pseudoAsyncCall(int input) {
    return CompletableFuture.supplyAsync(() -> {
                synchronized (lock) {
                    System.out.println("Executing for " + input);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return input > 3;
                }
            });
}

и List<Integer> входов, которыевы хотите проверить этот метод, вы можете проверить каждый из них в последовательности с рекурсивной композицией:

public static CompletableFuture<Integer> findMatch(List<Integer> inputs) {
    return findMatch(inputs, 0);
}

private static CompletableFuture<Integer> findMatch(List<Integer> inputs, int startIndex) {
    if (startIndex >= inputs.size()) {
        // no match found -- an exception could be thrown here if preferred
        return CompletableFuture.completedFuture(null);
    }
    return pseudoAsyncCall(inputs.get(startIndex))
            .thenCompose(result -> {
                if (result) {
                    return CompletableFuture.completedFuture(inputs.get(startIndex));
                } else {
                    return findMatch(inputs, startIndex + 1);
                }
            });
}

Это будет использоваться так:

public static void main(String[] args) {
    List<Integer> inputs = Arrays.asList(0, 1, 2, 3, 4, 5);
    CompletableFuture<Integer> matching = findMatch(inputs);

    System.out.println("Found match: " + matching.join());
}

Вывод:

Executing for 0
Executing for 1
Executing for 2
Executing for 3
Executing for 4
Found match: 4

Как видите, он не вызывается для ввода 5, в то время как ваш API (findMatch()) остается асинхронным.

0 голосов
/ 24 мая 2018

Я думаю, что лучшее, что вы можете сделать, это после того, как вы получите результат,

futures.forEach(f -> f.cancel(true));

Это не повлияет на того, кто дал результат, и постарается остановить остальных.Поскольку IIUC вы получаете их из внешнего источника, нет гарантии, что он фактически прервет их работу.

Однако, поскольку

этот класс не имеет прямого контроля над вычислениями, которые его вызываютЧтобы быть завершенным, отмена рассматривается как еще одна форма исключительного завершения

(из CompletableFuture документ), я сомневаюсь, что это будет делать то, что вы на самом деле хотите.

...