Условная логика на потоке реактора - PullRequest
1 голос
/ 20 марта 2019

Я новичок в Reactor.Я пытаюсь разработать следующую логику приложения:

  1. Чтение сообщений из темы Кафки source.
  2. Преобразование сообщений.
  3. Запись подмножествапреобразованные сообщения в новую тему Кафки target.
  4. Явное подтверждение операции чтения для всех сообщений, первоначально прочитанных из темы source.

Единственное решение, которое я нашел, - этопереписать приведенную выше бизнес-логику следующим образом.

  1. Чтение сообщений из темы Кафки source.
  2. Преобразование сообщений.
  3. Немедленно подтвердить, что сообщение не будетзаписано в тему target.
  4. Отфильтруйте все вышеприведенные сообщения.
  5. Запишите остальные преобразованные сообщения в новую тему Kafka target.
  6. Явно подтвердитеоперация чтения этих сообщений

Код, реализующий вторую логику, выглядит следующим образом:

receiver.receive()
        .flatMap(this::processMessage)
        .map(this::acknowledgeMessagesNotToWriteInKafka)
        .filter(this::isMessageToWriteInKafka)
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

Очевидно, receiver тип равен KafkaReceiver, а метод sendToKafka используетKafkaSender.Одна из вещей, которые мне не нравятся, это то, что я использую map для подтверждения некоторых сообщений.

Есть ли лучшее решение для реализации оригинальной логики?

1 Ответ

1 голос
/ 24 марта 2019

Это не точно ваши четыре шага бизнес-логики, но я думаю, что это немного ближе к тому, что вы хотите.

Вы могли бы признать "отброшенные" сообщения, которые не будутзаписывается в .doOnDiscard после .filter ...

receiver.receive()
        .flatMap(this::processMessage)
        .filter(this::isMessageToWriteInKafka)
        .doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

Примечание: вам нужно будет использовать правильный тип объекта, который был отброшен.Я не знаю, какой тип объекта излучает издатель, возвращенный из processMessage, но я предполагаю, что вы можете получить от него ReceiverRecord или ReceiverOffset, чтобы подтвердить его.

В качестве альтернативы, вы могли быобъединить filter / doOnDiscard в один .handle оператор ...

receiver.receive()
        .flatMap(this::processMessage)
        .handle((m, sink) -> {
            if (isMessageToWriteInKafka(m)) {
                sink.next(m);
            } else {
                m.getReceiverRecord().getReceiverOffset().acknowledge();
            }
        })
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());
...