Spring cloud stream, кафка, пауза / возобновление связывания - PullRequest
0 голосов
/ 26 ноября 2018

Мы используем пружинный поток облаков 2.0 & Kafka в качестве посредника сообщений.
Мы внедрили автоматический выключатель, который останавливает контекст приложения, для случаев, когда целевая система (БД или сторонняя)API) недоступен, как предлагается здесь: Остановить Spring Cloud Stream @StreamListener от прослушивания, когда целевая система не работает

Теперь в весеннем облачном потоке 2.0 есть способ управленияжизненный цикл связующего с использованием исполнительного механизма: Визуализация и управление связыванием


Возможно ли управлять жизненным циклом связующего из кода, т. е. в случае, если целевой сервер не работает, до pauseсвязующего, а когда он вверх, до resume?

1 Ответ

0 голосов
/ 26 ноября 2018

Извините, я неправильно прочитал ваш вопрос.

Вы можете автоматически подключить BindingsEndpoint, но, к сожалению, его State перечисление является закрытым, поэтому вы не можете вызвать changeState() программно.

У меня открыта проблема для этого .

РЕДАКТИРОВАТЬ

Вы можете сделать это с помощью отражения, но это немного уродливо ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {

    public static void main(String[] args) {
        SpringApplication.run(So53476384Application.class, args);
    }

    @Autowired
    BindingsEndpoint binding;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
                    So53476384Application.class.getClassLoader());
            ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
                try {
                    method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
                }
                catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }, method -> method.getName().equals("changeState"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}
...