Как заставить Spring Cloud Stream Listener ждать обработки сообщений до полной инициализации приложения при запуске? - PullRequest
0 голосов
/ 21 марта 2019

С помощью приложения Spring Cloud Stream Kafka, как мы можем гарантировать, что потоковый прослушиватель ожидает обработки сообщений до тех пор, пока не будут выполнены некоторые задачи зависимости (например, заполнение справочных данных)? Ниже приложение не может обрабатывать сообщения, потому что сообщения доставляются слишком рано. Как мы можем гарантировать такой порядок в Spring Boot App?

@Service
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {

  @Autowired FooService fooService;

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          // !!! this fails to save
          // messages are delivered too early before foo reference data got loaded into database
          fooService.save(v);
      });         
  }
}
  • spring-cloud-stream: 2.1.0.RELEASE
  • spring-boot: 2.1.2.RELEASE

Я обнаружил, что это не доступно в весеннем облачном потоке с 15 мая 2018 г.

Kafka - Задержка привязки до завершения инициализации сложной службы

Есть ли у нас план / график, когда это поддерживается?

1 Ответ

0 голосов
/ 22 марта 2019

Тем временем я добился того, чего хотел, используя @Ordered и ApplicationRunner.Это грязно, но работает.По сути, потоковый слушатель будет ждать, пока другие работы не будут выполнены.

@Service
@Order(1)
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
@Order(2)
public class MyFooStreamProcessor implements ApplicationRunner {

  @Autowired FooService fooService;
  private final AtomicBoolean ready = new AtomicBoolean(false);

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          while (ready.get() == false) {
            try {
              log.info("sleeping for other dependent components to finish initialization");
              Thread.sleep(10000);
            } catch (InterruptedException e) {
              log.info("woke up");
            }
          }
          fooService.save(v);
      });         
  }

  @Override
  public void run(ApplicationArguments args) throws Exception {
    ready.set(true);
  }
}
...