Как выполнить корректное завершение работы приложения при использовании Spring Cloud Stream с Kafka? - PullRequest
3 голосов
/ 04 июля 2019

У меня есть приложение весенней загрузки (v.1.57), которое использует Spring Cloud Stream (v1.3.0) и Kafka (v1.1.6). Я хочу иметь возможность корректно завершить его, т. Е. При завершении работы все потоковые прослушиватели (т. Е. Помеченные @StreamListener) должны:

  1. Прекратить опрос новых сообщений
  2. Завершить свою работу
  3. Передать смещение Кафке

Я заметил, что в ContainerProperties есть свойство shutdownTimeout (по умолчанию 10000 мс), поэтому я попытался изменить его до 30000, расширив класс ConcurrentKafkaListenerContainerFactoryConfigurer (поскольку он имеет аннотацию @ConditionalOnMissingBean) посредством отражения вот так:

@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {

    @Autowired
    private KafkaProperties kproperties;

    @Override
    public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                          ConsumerFactory<Object, Object> consumerFactory) {
        PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
        myAccessor.setPropertyValue("properties", kproperties);

        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        super.configure(listenerContainerFactory, consumerFactory);
        containerProperties.setShutdownTimeout(30000);
    }
}

Но это не удалось. Также попытался поместить его (shutdownTimeout: 30000) в application.yml в настройках связывателя весеннего облака, но опять-таки это не помогло.

Есть ли способ контролировать процесс отключения и достигать моих целей?

1 Ответ

2 голосов
/ 04 июля 2019

spring-kafka 1.1.x больше не поддерживается; вы должны использовать 1.3.9 с загрузкой 1.5.x.

Текущая версия Boot 1.5.x - 1.5.21.

Вы должны обновить немедленно.

Однако есть гораздо более новые версии всех этих проектов.

Spring Cloud Stream не использует эту фабрику или загрузочные свойства для создания своих контейнеров; он не предоставляет механизм для настройки этого свойства в контейнере.

В Spring Cloud Stream 2.1 добавлен ListenerContainerCustomizer, который позволяет настроить контейнер привязки, задав для него любые свойства.

Я предлагаю вам перейти на Boot 2.1.6 и Spring Cloud Stream Germantown (2.2.0).

EDIT

Это что-то вроде хака, но оно должно работать до тех пор, пока вы не сможете перейти на более новую версию потока ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {

    public static void main(String[] args) {
        SpringApplication.run(So56883620Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        this.latch.countDown();
        System.out.println(in);
        Thread.sleep(6_000);
        System.out.println("exiting");
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
            // wait for listener to start
            this.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Shutting down");
        };
    }

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                @SuppressWarnings("unchecked")
                Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                        .getPropertyValue("lifecycle.messageListenerContainer"))
                                .getContainerProperties().setShutdownTimeout(30_000L);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...