Запуск Mono в фоновом режиме при возврате ответа при использовании Spring Webflux - PullRequest
0 голосов
/ 09 апреля 2019

Этот вопрос относится к Немедленно вернитесь в весенний веб-поток , но я не думаю, что это то же самое (по крайней мере, ответ там меня не устраивает).

У меня есть функция, возвращающая Mono, которая при вызове запускает длительное задание. Эта функция вызывается при вызове HTTP-API Spring Webflux. Вот пример:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}

Проблема с кодом выше заключается в том, что «201 Created» создается после длительного выполнения задания. Я хочу сбросить longRunningJob в фоновом режиме и сразу же вернуть «201 Created».

Возможно, я мог бы сделать что-то вроде этого:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}

Но для меня не очень-то идиотично вызывать subscribe() вручную (например, intellij жалуется, что я звоню subscribe() в неблокируемой области). Нет ли лучшего способа объединить два «потока» без использования явного subscribe? Если да, то как мне изменить вышеуказанную функцию startNewJob для достижения этой цели?

1 Ответ

2 голосов
/ 13 апреля 2019

AFAIK, использование одного из методов subscribe - единственный способ действительно запустить задание в фоновом режиме с собственным жизненным циклом (не привязанным к возвращенному издателю).

Если бы вы использовали один из операторов для объединения издателя задания и издателя ответа (например, zip или merge), тогда жизненный цикл издателя задания будет привязан к издателю ответа, который не является что вы хотите для фоновой работы.

Одна вещь, которую вы могли бы рассмотреть, это запуск фонового задания в потоке издателя ответа, а не непосредственно в теле метода. например через doOnSubscibe или от оператора перед ответом.

Это связало бы начало фонового задания с событиями onSubscribe издателя ответа, но все же позволило бы завершить его в фоновом режиме.

Также обратите внимание, что если вы хотите иметь возможность отменить фоновое задание (например, может быть, во время завершения работы приложения), вам нужно сохранить Disposable, возвращенный из subscribe, чтобы позже вы могли вызвать dispose на Это. Это может быть лучше сделано из некоторого типа BackgroundJobManager, который может отслеживать все запущенные задания.

...