Как использовать CompletableFuture, не рискуя StackOverflowError? - PullRequest
0 голосов
/ 24 апреля 2018

Я хочу пройтись по пространству поиска асинхронной функции.Я закодировал логику следующим образом:

/**
 * Assuming that a function maps a range of inputs to the same output value, minimizes the input value while
 * maintaining the output value.
 *
 * @param previousInput the last input known to return {@code target}
 * @param currentInput  the new input value to evaluate
 * @param function      maps an input to an output value
 * @param target        the expected output value
 * @return the minimum input value that results in the {@code target} output value
 * <br>{@code @throws NullPointerException} if any argument is null
 * <br>{@code @throws IllegalArgumentException} if {@code stepSize} is zero}
 */
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                         BigDecimal currentInput,
                                                         BigDecimal stepSize,
                                                         Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                         BigDecimal target)
{
    return function.apply(currentInput).thenCompose(output ->
    {
        assertThat("stepSize", stepSize).isNotZero();
        int outputMinusTarget = output.compareTo(target);
        if (outputMinusTarget != 0)
            return CompletableFuture.completedFuture(previousInput);
        BigDecimal nextInput = currentInput.add(stepSize);
        if (nextInput.compareTo(BigDecimal.ZERO) < 0)
            return CompletableFuture.completedFuture(previousInput);
        return optimizeInput(currentInput, nextInput, stepSize, function, target);
    });
}

К сожалению, если функция имеет большое пространство поиска, после некоторых итераций возникает ошибка StackoverflowError.Можно ли итеративно обходить пространство поиска с помощью стека фиксированного размера?

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Ответ dfogni должен работать нормально, но для полноты можно избежать передачи обслуживания исполнителю в случае, когда метод является синхронным с использованием техники типа trampolining .

ToЧтобы упростить процесс, я представил класс, который фиксирует состояние, которое меняется между итерациями, и вводит методы, которые реализуют ваши проверки завершения и генерируют следующее состояние.Я считаю, что это так же, как ваша оригинальная логика, но вы можете проверить трижды.

private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    class State {
        BigDecimal prev;
        BigDecimal curr;
        BigDecimal output;

        State(BigDecimal prev, BigDecimal curr, BigDecimal output) {
            this.prev = prev;
            this.curr = curr;
            this.output = output;
        }

        boolean shouldContinue() {
            return output.compareTo(target) == 0 && curr.add(stepSize).compareTo(BigDecimal.ZERO) >= 0;
        }

        CompletionStage<State> next() {
            BigDecimal nextInput = curr.add(stepSize);
            return function.apply(nextInput).thenApply(nextOutput -> new State(curr, nextInput, nextOutput));
        }
    }

    /* Now it gets complicated... we have to check if we're running on the same thread we were called on. If we
     * were, instead of recursively calling `next()`, we'll use PassBack to pass our new state back 
     * to the stack that called us.
     */
    class Passback {
        State state = null;
        boolean isRunning = true;

        State poll() {
            final State c = this.state;
            this.state = null;
            return c;
        }
    }
    class InputOptimizer extends CompletableFuture<BigDecimal> {
        void optimize(State state, final Thread previousThread, final Passback previousPassback) {
            final Thread currentThread = Thread.currentThread();

            if (currentThread.equals(previousThread) && previousPassback.isRunning) {
                // this is a recursive call, our caller will run it
                previousPassback.state = state;
            } else {
                Passback passback = new Passback();
                State curr = state;
                do {
                    if (curr.shouldContinue()) {
                        curr.next().thenAccept(next -> optimize(next, currentThread, passback));
                    } else {
                        complete(curr.prev);
                        return;
                    }
                // loop as long as we're making synchronous recursive calls
                } while ((curr = passback.poll()) != null);
                passback.isRunning = false;
            }
        }
    }

    InputOptimizer ret = new InputOptimizer();
    function.apply(currentInput)
            .thenAccept(output -> ret.optimize(
                    new State(previousInput, currentInput, output),
                    null, null));
    return ret;
}

Хорошо, это довольно сложно.Также обратите внимание, что для этого требуется, чтобы ваша функция никогда не выдавала исключение или не выполняла его исключительно, что может быть проблематично.Вы можете генерировать это так, что вам нужно написать его только один раз (с правильной обработкой исключений), который можно найти в asyncutil library ( Отказ от ответственности: Я являюсь соавторомэта библиотека).Могут быть другие библиотеки со схожей функциональностью, скорее всего, зрелая реактивная библиотека, такая как Rx.Используя asyncutil,

 private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    // ... State class from before
    return function
            .apply(currentInput)
            .thenCompose(output -> AsyncTrampoline.asyncWhile(
                    State::shouldContinue, 
                    State::next, 
                    new State(previousInput, currentInput, output)))
            .thenApply(state -> state.prev);    
}
0 голосов
/ 26 апреля 2018

у вас есть следующая рекурсивная структура

CompletableFuture<T> compute(...) {
  return asyncTask().thenCompose(t -> {
    if (...)
      return completedFuture(t);
    } else {
      return compute(...);
    }
  }
}

Вы можете переписать ее, избегая завершающей будущей композиции и использования ее стека во время завершения.

CompletableFuture<T> compute(...) {
  CompletableFuture<T> result = new CompletableFuture<>();
  computeHelper(result, ...);
  return result;
}   

void computeHelper(CompletableFuture<T> result, ...) {
  asyncTask().thenAccept(t -> {
    if (...) {
      result.complete(t);
    } else {
      computeHelper(result, ...);
    }
  });
}

, если asyncTask() на самом деле не асинхронныйи просто используйте текущий поток, вы должны заменить thenAccept одной из его асинхронных версий, чтобы использовать очередь задач исполнителя вместо стека потока.

...