Мне нужно обработать поток сообщений от kafka topi c, используя реактор-kafka. Что я должен достичь, это следующие шаги:
- Считывание сообщений из kafka topi c в виде потока.
- Попробуйте отправить каждое сообщение во внешнюю систему.
- В в случае успеха запишите это событие, в случае сбоя - отправьте сообщение в другой топи c.
- Подтвердите обработанные сообщения вручную (важно: по порядку).
У меня есть находится в документах пример построения реактивного конвейера (просто получение сообщения от одной топи c и отправка его другому) с ручным подтверждением сообщений:
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset())))
.doOnNext(m -> m.correlationMetadata().acknowledge());
Но как я могу отправить на другой topi c одну часть обработанных сообщений и пропустить другую часть, но в конце сделайте подтверждения для всех обработанных сообщений в порядке?