Ниже приведен код, который я имею для компонента, который запускает Flux
и подписывается на него, все в конструкторе класса. Этот конкретный поток происходит от вызова mongoChangeStreams. Оно не прекращается, если не возникает ошибка.
Я хочу, чтобы подписка оставалась в живых постоянно, поэтому я перезапускаю подписку, если событие завершается из-за ошибки.
Это произошло со мной что вызов подписки внутри конструктора может быть плохой идеей. Кроме того, я, вероятно, должен включить способ корректно завершить работу этого приложения, вызвав отмену подписки во время завершения работы.
Я предполагаю, что мне следует реализовать SmartLifeCycle
, но я не уверен, как это сделать. Существует ли стандартный способ реализации SmartLifeCycle для компонента, поддерживаемого подпиской Flux?
@Component
class SubscriptionManager(
private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
private val fooProcessor: FooProcessor
) {
private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor
private fun subscribe() = buildSubscriber().also {
fooFluxProvider.foos().subscribe(it)
}
private fun buildSubscriber(): BaseSubscriber<Foo> {
return object : BaseSubscriber<Foo>() {
override fun hookOnSubscribe(subscription: Subscription) {
subscription.request(1)
}
override fun hookOnNext(value: Foo) {
//process the foo
fooProcessor.process(value)//sync call
//ask for another foo
request(1)
}
override fun hookOnError(throwable: Throwable) {
logger.error("Something went wrong, restarting subscription", throwable)
//restart the subscription. We'll recover if we're lucky
subscription = subscribe()
}
}
}
}