Как получить уведомление, когда все фьючерсы в цепочке Future.compose успешно работают? - PullRequest
0 голосов
/ 14 февраля 2019

Мое приложение (типичный сервер REST, который вызывает другие службы REST внутри) имеет два основных класса для выполнения процедуры начальной загрузки.

Существует класс Application.kt , который должен сам настраивать экземпляр vertx и регистрировать определенные модули (например, интеграция Джексона Котлина):

class Application(
    private val profileSetting: String? = System.getenv("ACTIVE_PROFILES"),
    private val logger: Logger = LoggerFactory.getLogger(Application::class.java)!!
) {

    fun bootstrap() {
        val profiles = activeProfiles()
        val meterRegistry = configureMeters()
        val vertx = bootstrapVertx(meterRegistry)

        vertx.deployVerticle(ApplicationBootstrapVerticle(profiles)) { startup ->
            if (startup.succeeded()) {
                logger.info("Application startup finished")
            } else {
                logger.error("Application startup failed", startup.cause())
                vertx.close()
            }
        }
    }
}

Кроме того, существует класс ApplicationBootstrapVerticle.kt , который должен развертывать различные вершины в определенном порядке.Некоторые из них в последовательности, некоторые из них параллельно:

class ApplicationBootstrapVerticle(
    private val profiles: List<String>,
    private val logger: Logger = LoggerFactory.getLogger(ApplicationBootstrapVerticle::class.java)
) : AbstractVerticle() {

    override fun start(startFuture: Future<Void>) {
        initializeApplicationConfig().compose {
            logger.info("Application configuration initialized")
            initializeScheduledJobs()
        }.compose {
            logger.info("Scheduled jobs initialized")
            initializeRestEndpoints()
        }.compose {
            logger.info("Http server started")
            startFuture
        }.setHandler { ar ->
            if (ar.succeeded()) {
                startFuture.complete()
            } else {
                startFuture.fail(ar.cause())
            }
        }
    }

    private fun initializeApplicationConfig(): Future<String> {
        return Future.future<String>().also {
            vertx.deployVerticle(
                ApplicationConfigVerticle(profiles),
                it.completer()
            )
        }
    }

    private fun initializeScheduledJobs(): CompositeFuture {
        val stationsJob = Future.future<String>()
        val capabilitiesJob = Future.future<String>()

        return CompositeFuture.all(stationsJob, capabilitiesJob).also {
            vertx.deployVerticle(
                StationQualitiesVerticle(),
                stationsJob.completer()
            )
            vertx.deployVerticle(
                VideoCapabilitiesVerticle(),
                capabilitiesJob.completer()
            )
        }
    }

    private fun initializeRestEndpoints(): Future<String> {
        return Future.future<String>().also {
            vertx.deployVerticle(
                RestEndpointVerticle(dispatcherFactory = RouteDispatcherFactory(vertx)),
                it.completer()
            )
        }
    }
}

Я не уверен, является ли это предполагаемым способом начальной загрузки приложения, если оно есть.Что еще более важно, я не уверен, правильно ли я понимаю механику Future.compose.

Приложение успешно запускается, и я вижу все необходимые сообщения журнала, кроме

Запуск приложения завершен

.Также следующий код никогда не вызывается в случае успеха :

}.setHandler { ar ->
    if (ar.succeeded()) {
       startFuture.complete()
    } else {
       startFuture.fail(ar.cause())
    }
}

В случае сбоя, например, когда мои файлы конфигурации приложения (yaml) не могут быть проанализированы, посколькунеизвестное поле в объекте destination , сообщение журнала

Ошибка запуска приложения

появляется в журналах, а также вызывается приведенный выше код.

Мне любопытно, что не так с моей составленной цепочкой фьючерсов.Я думал, что обработчик будет вызван после успеха предыдущих фьючерсов или одного из них, но я думаю, что он вызывается только в случае успеха.

Обновление

Полагаю, что отсутствует вызов startFuture.complete().Адаптировав метод start, он, наконец, сработал:

override fun start(startFuture: Future<Void>) {
    initializeApplicationConfig().compose {
        logger.info("Application configuration initialized")
        initializeScheduledJobs()
    }.compose {
        logger.info("Scheduled jobs initialized")
        initializeRestEndpoints()
    }.compose {
        logger.info("Http server started")
        startFuture.complete()
        startFuture
    }.setHandler(
        startFuture.completer()
    )
}

Я не уверен, хотя, является ли это предполагаемым способом обработки этой будущей цепочки.

1 Ответ

0 голосов
/ 14 февраля 2019

Решение, которое сработало для меня, выглядит так:

override fun start(startFuture: Future<Void>) {
    initializeApplicationConfig().compose {
        logger.info("Application configuration initialized")
        initializeScheduledJobs()
    }.compose {
        logger.info("Scheduled jobs initialized")
        initializeRestEndpoints()
    }.setHandler { ar ->
        if(ar.succeeded()) {
            logger.info("Http server started")
            startFuture.complete()
        } else {
            startFuture.fail(ar.cause())
        }
    }
}
...