Неблокирующее CompleteableFuture, которое выполняется до выхода из программы - PullRequest
0 голосов
/ 27 августа 2018

Я выполняю приложение (static void main(String[] args), которое асинхронно делает POST запросов к конечной точке в определенной частоте. Из-за количества запросов и продолжительного времени выполнения приложения удерживать фьючерсы не представляется возможным в списке, а затем обработать их по факту.

Итак, я добавил вызов thenAcceptAsync, который просто регистрирует ответы по мере их поступления. Однако, особенно для более коротких тестовых прогонов, программа завершится и завершится до того, как некоторые ответы вернутся.

Как я могу убедиться, что все ответы зарегистрированы до завершения программы?

public void replayScripts() {
    final String applicationId = getSparkApplicationId();
    LOGGER.debug("applicationId: {}", applicationId);

    final long scriptsToSubmit = this.config.getMaxScripts() == null ? this.config.getScripts().size()
            : this.config.getMaxScripts().intValue();
    long maxWaitTimeMillis = 0L;
    long actualWaitTimeMillis = 0L;

    for (int i = 0; i < scriptsToSubmit; i++) {
        LOGGER.info("submitting script #{} of {}", i + 1, scriptsToSubmit);

        this.dao.submitScript(this.config.getScripts().get(i).getRawScript())
                .thenAcceptAsync(this::logResponse);

        if (i + 1 < this.config.getScripts().size()) {
            final long millisWait = TimeUnit.MILLISECONDS.convert(
                    Duration.between(this.config.getScripts().get(i).getSubmissionTime(),
                            this.config.getScripts().get(i + 1).getSubmissionTime()).get(ChronoUnit.NANOS),
                    TimeUnit.NANOSECONDS);

            maxWaitTimeMillis += millisWait;
            actualWaitTimeMillis += sleep(millisWait, applicationId);

            if (this.config.getClearCache()) {
                this.dao.clearCache();
            }
        }
    }

    LOGGER.info("max wait time: {}s", maxWaitTimeMillis / 1000.0);
    LOGGER.info("actual wait time: {}s", actualWaitTimeMillis / 1000.0);
}

РЕДАКТИРОВАТЬ: решение

Пользуясь предложением @ irahavoi, я на некоторое время добавил решение для счетчиков, добавив этот фрагмент, в котором используется член класса AtomicInteger для отслеживания количества зарегистрированных ответов:

    while (this.config.isLogResponses() && this.loggedResponsesCounter.get() < scriptsToSubmit) {
        try {
            LOGGER.info("sleeping for one second to allow all responses to be logged");
            Thread.sleep(1000);
        } catch (final InterruptedException e) {
            LOGGER.warn("An exception occurred while attempting to wait for all responses to be logged", e);
        }
    }

Ответы [ 2 ]

0 голосов
/ 28 августа 2018

Другое решение, которое может быть чище, - это отключить ваш ExecutorService и дождаться его завершения.

Я бы предположил, что вы уже используете настроенный ExecutorService в своем DAO, так как общий пул, используемый по умолчанию CompletableFuture, не подходит для задач ввода-вывода. В противном случае вы должны изучить фабричные методы в Executors.

Итак, после того, как вы отправили все свои задачи, вы должны вызвать ExecutorService.shutdown(), а затем awaitTermination() в главном потоке.

Это будет ожидать завершения всех задач (включая поставленные в очередь).

0 голосов
/ 27 августа 2018

Я вижу 2 варианта решения этой проблемы (ни один из них не очень элегантный):

  1. Разделите ваши запросы в пакетах (например, каждый пакет составляет 1 тыс. Запросов). И обрабатывать каждую партию отдельно. Таким образом, вы сможете использовать это без риска OOM:

    CompletableFuture.allOf (aBatchOfLogPipelineResponses) .join ();

  2. Как упоминалось в комментарии выше, у вас может быть счетчик полученных ответов (каждый раз, когда приходит ответ, он увеличивается). и периодически проверяйте его в главном потоке: если значение счетчика равно scriptsToSubmit, то можно выйти.

...