Условные логи c по потоку сообщений от реактора-кафки - PullRequest
0 голосов
/ 06 февраля 2020

Мне нужно обработать поток сообщений от kafka topi c, используя реактор-kafka. Что я должен достичь, это следующие шаги:

  1. Считывание сообщений из kafka topi c в виде потока.
  2. Попробуйте отправить каждое сообщение во внешнюю систему.
  3. В в случае успеха запишите это событие, в случае сбоя - отправьте сообщение в другой топи c.
  4. Подтвердите обработанные сообщения вручную (важно: по порядку).

У меня есть находится в документах пример построения реактивного конвейера (просто получение сообщения от одной топи c и отправка его другому) с ручным подтверждением сообщений:

sender.send(KafkaReceiver.create(receiverOptions)
                         .receive()
                         .map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset())))  
      .doOnNext(m -> m.correlationMetadata().acknowledge());

Но как я могу отправить на другой topi c одну часть обработанных сообщений и пропустить другую часть, но в конце сделайте подтверждения для всех обработанных сообщений в порядке?

...