Это не точно ваши четыре шага бизнес-логики, но я думаю, что это немного ближе к тому, что вы хотите.
Вы могли бы признать "отброшенные" сообщения, которые не будутзаписывается в .doOnDiscard
после .filter
...
receiver.receive()
.flatMap(this::processMessage)
.filter(this::isMessageToWriteInKafka)
.doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
Примечание: вам нужно будет использовать правильный тип объекта, который был отброшен.Я не знаю, какой тип объекта излучает издатель, возвращенный из processMessage
, но я предполагаю, что вы можете получить от него ReceiverRecord
или ReceiverOffset
, чтобы подтвердить его.
В качестве альтернативы, вы могли быобъединить filter
/ doOnDiscard
в один .handle
оператор ...
receiver.receive()
.flatMap(this::processMessage)
.handle((m, sink) -> {
if (isMessageToWriteInKafka(m)) {
sink.next(m);
} else {
m.getReceiverRecord().getReceiverOffset().acknowledge();
}
})
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());