Как отправить сообщение за сообщение Кафке - PullRequest
0 голосов
/ 19 февраля 2020

Я новичок в реактивном программировании и пытаюсь реализовать очень простой c сценарий. Я хочу отправлять сообщение kafka каждый раз, когда файл сбрасывается в указанную папку c. Я думаю, что не очень хорошо разбираюсь в базовых вещах ... так что, пожалуйста, не могли бы вы мне помочь?

Итак, у меня есть несколько вопросов: в чем разница между сообщениями smallrye-реактивно-ответными и smallrye-реактивными- streams-operator?

У меня есть этот простой код:

@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
     if(Objects.isNull(currentMessage)){
          //currentMessage is an instance variable which is null when I start the application
          return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
     }
     else {
          //currentMessage has been correctly set with the file information
          LOGGER.info(currentMessage);
          return ReactiveStreams.of(currentMessage).map(Message::of);
     }
}

Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта, которая будет иметь значение null ценности. Однако я не понимаю, почему, когда мой код переходит к оператору else, ничего не идет к topi c? Кажется, что .of инструкции оператора if сломали потоки или что-то в этом роде ...

Как сохранить непрерывные потоки, которые "реагируют" на новые удаленные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...

Если я не верну экземпляр PublisherBuilder, а, например, Integer, тогда моя kafka topi c будет заполнена очень огромный поток целочисленных значений. Вот почему примеры используют некоторые интервалы при отправке сообщений ...

Должен ли я использовать некоторые CompletationStage или CompletableFuture? RxJAva2? Это немного сбивает с толку, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)

В чем различия между:

  • ReactiveStreams.fromCompletionStage
  • ReactiveStreams.fromProcessor
  • ReactiveStreams.fromPublisher
  • ReactiveStreams.fromSubscriber

Какой из них использовать в каком сценарии?

Большое спасибо!

1 Ответ

0 голосов
/ 19 февраля 2020

Давайте начнем с разницы между операторами smallrye-реактивного обмена сообщениями и smallrye-реактивными потоками: операторы smallrye реактивного потока - то же самое, что и сообщения smallrye-реактивного обмена сообщениями, но, кроме того, он поддерживает микропрофиль-контекст распространение . Поскольку большинство провайдеров реактивных сообщений используют Vert.x за кулисами, оно обработает ваше сообщение в стиле event-l oop, что означает, что оно будет выполняться в отдельном потоке. Иногда вам нужно распространять некоторые ctx из вашего базового потока в новый поток (например: заполнение контекста CDI и Tx для выполнения некоторой логики JPA Entity Manager c). Вот где справка по распространению ctx.

Для сигнатур методов. Вы можете взглянуть на официальную документацию SmallRye-реактивных потоков разделы 3,4 и 5. Каждый из них имеет свой вариант использования. Какой вкус вы хотите использовать, зависит от вас.

Когда использовать что? Если вы не работаете в реактивном контексте, вы можете использовать ниже для отправки сообщений:

@ Inject @Channel ("my-channel") Emitter emitter;

Для использования сообщения вы можете использовать подпись метода следующим образом:

@ Incoming ("channel-2") publi c CompletionStage doSomething (Message anEvent)

Или

@ Incoming ("channel-2") publi c void doSomething (String anEvent)

Надеюсь, что поможет.

...