Я новичок в реактивном программировании и пытаюсь реализовать очень простой c сценарий. Я хочу отправлять сообщение kafka каждый раз, когда файл сбрасывается в указанную папку c. Я думаю, что не очень хорошо разбираюсь в базовых вещах ... так что, пожалуйста, не могли бы вы мне помочь?
Итак, у меня есть несколько вопросов: в чем разница между сообщениями smallrye-реактивно-ответными и smallrye-реактивными- streams-operator?
У меня есть этот простой код:
@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
if(Objects.isNull(currentMessage)){
//currentMessage is an instance variable which is null when I start the application
return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
}
else {
//currentMessage has been correctly set with the file information
LOGGER.info(currentMessage);
return ReactiveStreams.of(currentMessage).map(Message::of);
}
}
Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта, которая будет иметь значение null ценности. Однако я не понимаю, почему, когда мой код переходит к оператору else, ничего не идет к topi c? Кажется, что .of инструкции оператора if сломали потоки или что-то в этом роде ...
Как сохранить непрерывные потоки, которые "реагируют" на новые удаленные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...
Если я не верну экземпляр PublisherBuilder, а, например, Integer, тогда моя kafka topi c будет заполнена очень огромный поток целочисленных значений. Вот почему примеры используют некоторые интервалы при отправке сообщений ...
Должен ли я использовать некоторые CompletationStage или CompletableFuture? RxJAva2? Это немного сбивает с толку, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)
В чем различия между:
- ReactiveStreams.fromCompletionStage
- ReactiveStreams.fromProcessor
- ReactiveStreams.fromPublisher
- ReactiveStreams.fromSubscriber
Какой из них использовать в каком сценарии?
Большое спасибо!