Как контролировать отключение реактивной кафки после экспоненциального отката - PullRequest
0 голосов
/ 15 ноября 2018

Я хотел бы использовать сообщения.Если я получу исключение, я попытаюсь повторить экспоненциальный откат.Если это тоже не помогает, я отключаю потребителя и хочу снова запустить его через 6 часов.

У меня есть следующий код:

    Publisher<ConsumerRecord<K, V>> source = RestartSource.onFailuresWithBackoff(
                    Duration.ofSeconds(3),
                    Duration.ofSeconds(5),
                    0.2,
                    5,
                    () -> Consumer.plainSource(
                            consumerSettings,
                            Subscriptions.topics(topic.getName()))
                            .mapMaterializedValue(
                                    c -> {
                                        control.set(c);
                                        return c;
                                    })
            ).runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT),materializer);
    control.get().shutdown();
    return Flowable.fromPublisher(source);

Этот код выполняет экспоненциальный откат, даже если он имеетлюбая проблема.Однако я не знаю, как остановить службу после 5 попыток, выяснить, действительно ли она остановилась, а затем перезапустить клиента через 6 часов.

...