Реализация SmartLifeCycle с подпиской реактора - PullRequest
0 голосов
/ 14 января 2020

Ниже приведен код, который я имею для компонента, который запускает Flux и подписывается на него, все в конструкторе класса. Этот конкретный поток происходит от вызова mongoChangeStreams. Оно не прекращается, если не возникает ошибка.

Я хочу, чтобы подписка оставалась в живых постоянно, поэтому я перезапускаю подписку, если событие завершается из-за ошибки.

Это произошло со мной что вызов подписки внутри конструктора может быть плохой идеей. Кроме того, я, вероятно, должен включить способ корректно завершить работу этого приложения, вызвав отмену подписки во время завершения работы.

Я предполагаю, что мне следует реализовать SmartLifeCycle, но я не уверен, как это сделать. Существует ли стандартный способ реализации SmartLifeCycle для компонента, поддерживаемого подпиской Flux?

@Component
class SubscriptionManager(
    private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
    private val fooProcessor: FooProcessor
)  {

    private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor

    private fun subscribe() = buildSubscriber().also {
        fooFluxProvider.foos().subscribe(it)
    }

    private fun buildSubscriber(): BaseSubscriber<Foo> {
        return object : BaseSubscriber<Foo>() {
            override fun hookOnSubscribe(subscription: Subscription) {

                subscription.request(1)
            }

            override fun hookOnNext(value: Foo) {
                //process the foo
                fooProcessor.process(value)//sync call
                //ask for another foo
                request(1)
            }

            override fun hookOnError(throwable: Throwable) {
                logger.error("Something went wrong, restarting subscription", throwable)
                //restart the subscription. We'll recover if we're lucky
               subscription = subscribe()
            }
        }
    }




}

1 Ответ

0 голосов
/ 23 января 2020
  1. Вместо создания подкласса подписчика, который повторно подписывается на исключение, соедините один из операторов retry* в Flux перед подпиской. Операторы повтора перепишутся на последующий поток, если он завершится с исключением. Например, fooFluxProvider.foos().retry() будет повторяться бесконечно. Существуют другие варианты retry* для более продвинутого поведения, включая чрезвычайно настраиваемый retryWhen, который можно использовать с классом reactor.retry.Retry от reactor-extra.
  2. Вместо передачи подписчика на subscribe(subscriber) вызовите один из subscribe методов, который возвращает Disposable. Это дает вам объект, для которого вы можете вызвать dispose() позже во время выключения, чтобы отменить подписку.
  3. Для реализации SmartLifecycle:
    • В конструкторе (или в start()), создайте Flux (но не подписывайтесь на него в конструкторе)
    • В start(), вызовите flux.subscribe() и сохраните возвращенное Disposable в поле члена. Метод start() гораздо лучше подходит для запуска фоновых заданий, чем конструктор. Рассмотрите также цепочку .subscribeOn(Scheduler) до .subscribe(), если вы хотите, чтобы это работало в фоновом режиме (по умолчанию подписка происходит в потоке, в котором вызывался subscribe).
    • В stop(), позвоните disposable.dispose()

Возможно, что-то вроде этого:

class SubscriptionManager(
        fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
        fooProcessor: FooProcessor
) : SmartLifecycle {
    private val logger = LoggerFactory.getLogger(javaClass)

    private val fooFlux = fooFluxProvider.foos()
            // Subscribe on a parallel scheduler to run in the background
            .subscribeOn(Schedulers.parallel())
            // Publish on a boundedElastic scheduler if fooProcessor.process blocks
            .publishOn(Schedulers.boundedElastic())
            // Use .doOnNext to send the foo to your processor
            // Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
            // Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
            .doOnNext(fooProcessor::process)
            // Log if an exception occurred
            .doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
            // Resubscribe if an exception occurred
            .retry()
            // Repeat if you want to resubscribe if the upstream flux ever completes successfully
            .repeat()

    private var disposable: Disposable? = null

    @Synchronized
    override fun start() {
        if (!isRunning) {
            disposable = fooFlux.subscribe()
        }
    }

    @Synchronized
    override fun stop() {
        disposable?.dispose()
        disposable = null
    }

    @Synchronized
    override fun isRunning(): Boolean {
        return disposable != null
    }

}
...