spring-kafka 1.1.x больше не поддерживается; вы должны использовать 1.3.9 с загрузкой 1.5.x.
Текущая версия Boot 1.5.x - 1.5.21.
Вы должны обновить немедленно.
Однако есть гораздо более новые версии всех этих проектов.
Spring Cloud Stream не использует эту фабрику или загрузочные свойства для создания своих контейнеров; он не предоставляет механизм для настройки этого свойства в контейнере.
В Spring Cloud Stream 2.1 добавлен ListenerContainerCustomizer
, который позволяет настроить контейнер привязки, задав для него любые свойства.
Я предлагаю вам перейти на Boot 2.1.6 и Spring Cloud Stream Germantown (2.2.0).
EDIT
Это что-то вроде хака, но оно должно работать до тех пор, пока вы не сможете перейти на более новую версию потока ...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {
public static void main(String[] args) {
SpringApplication.run(So56883620Application.class, args).close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@StreamListener(Sink.INPUT)
public void listen(String in) throws InterruptedException {
this.latch.countDown();
System.out.println(in);
Thread.sleep(6_000);
System.out.println("exiting");
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
// wait for listener to start
this.latch.await(10, TimeUnit.SECONDS);
System.out.println("Shutting down");
};
}
@Bean
public SmartLifecycle bindingFixer(BindingService bindingService) {
return new SmartLifecycle() {
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public void stop() {
// no op
}
@Override
public void start() {
@SuppressWarnings("unchecked")
Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
.getPropertyValue("consumerBindings");
@SuppressWarnings("unchecked")
Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
.getPropertyValue("lifecycle.messageListenerContainer"))
.getContainerProperties().setShutdownTimeout(30_000L);
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public boolean isAutoStartup() {
return true;
}
};
}
}