Флюс блокирующий запуск Netty - PullRequest
0 голосов
/ 14 марта 2019

У меня есть @Repository чтение из источника данных в реальном времени.Я делаю данные доступными, используя Flux.create() { sink->sink.next() }

A @Service выполняет следующее:

@Autowired MyRepository myRepository;

@PostConstruct() public void startUp() {
  ConnectableFlux<Object> cf = myRepository.flux.publish();
  cf.subscribe(System.out::println);
  cf.connect();
}

Это работает и печатает данные, но я не получить "Нетти запущен" в журналах и @Controllers не отвечают.Если я опущу cf.connect(), Netty запускается.Поэтому я предполагаю, что cf.connect() блокирует Netty.

В идеале, я хочу, чтобы подписка запускалась автоматически.Использование connect() в @PostConstuct слишком рано?Должен ли я прослушивать событие «Netty Started», тогда connect(), или моя подписка просто неверна?

Редактировать: Если connect запущен в пределах Thread, Netty запускается и подпискаработает.

1 Ответ

0 голосов
/ 17 марта 2019

Помещение @EnableAsync в основной класс приложения Spring Boot и @Async в приведенном выше методе, похоже, сработало.

Редактировать: Я нашел лучшее решение здесь Подключаемые блоки Flux на toIterator.forEach, а Flux - нет.# 1549 .Мой код теперь выглядит так:

Flux<MyClass> flux = Flux.create(
   sink -> {
       while(condition) {
          sink.next(nextValueFromDataSource);
       }
       sink.complete();
    }
)
.publish()
.autoConnect(1);

Также из-за autoConnect(1), нет необходимости в @Async.

...