С помощью приложения Spring Cloud Stream Kafka, как мы можем гарантировать, что потоковый прослушиватель ожидает обработки сообщений до тех пор, пока не будут выполнены некоторые задачи зависимости (например, заполнение справочных данных)? Ниже приложение не может обрабатывать сообщения, потому что сообщения доставляются слишком рано. Как мы можем гарантировать такой порядок в Spring Boot App?
@Service
public class ApplicationStartupService implements ApplicationRunner {
private final FooReferenceDataService fooReferenceDataService;
@Override
public void run(ApplicationArguments args) throws Exception {
fooReferenceDataService.loadData();
}
}
@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {
@Autowired FooService fooService;
@StreamListener("my-input")
public void process(KStream<String, Foo> input) {
input.foreach((k,v)-> {
// !!! this fails to save
// messages are delivered too early before foo reference data got loaded into database
fooService.save(v);
});
}
}
- spring-cloud-stream: 2.1.0.RELEASE
- spring-boot: 2.1.2.RELEASE
Я обнаружил, что это не доступно в весеннем облачном потоке с 15 мая 2018 г.
Kafka - Задержка привязки до завершения инициализации сложной службы
Есть ли у нас план / график, когда это поддерживается?