Подтверждение функционального руководства Spring Cloud Stream - KafkaHeaders.ACKNOWLEDGMENT недоступно - PullRequest
0 голосов
/ 22 апреля 2020

Я использую функциональные интерфейсы Spring Cloud Webflux и Spring Cloud Streams для обработки моей кафки.

Если я выполняю обработку последовательно и если я убиваю приложение, он возвращает сообщение для обработки, которая работает как и ожидалось, так как нет пропадания сообщений. Однако, если я пытаюсь сделать параллельное, это кажется подтверждением для Кафки, что понятно, поскольку теперь это отдельный поток, поэтому я хочу обратиться к ручному подтверждению.

Мой код:

  1. application.yml (соответствующие части)
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
          minPartitionCount: 2         
      bindings:
          receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              consumer:
                  autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
Код получателя
public Consumer<Flux<Message<String>>> receiver() throws IOException {
        return (sink -> {
            sink
            .onBackpressureBuffer()
            .parallel(4)
            .runOn(Schedulers.parallel())
            .subscribe((record)->{  
                Flux<Action> executor = new 
                           //Internal code which does transformation and provides a flux for execution (names changed)
 IncomingMessage().process(record);

                if(executor != null) {
                    Disposable disposable=null;
                    disposable= executor.subscribe(
                            (action)->{

                                try {
                                   //Process execute does the processing on the modified data (names changed)
                                    Process.execute(action);
                                    Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                                    if(acknowledgment !=null) {
                                        acknowledgment.acknowledge();
                                    }
                                }
                                catch(Exception e) {

                                log.fatal(e.getMsg());
                                }
                            },
                            (e)->{

                                log.fatal(e.getMsg());
                                }
                            });
                    if(disposable != null) {
                        disposable.dispose();
                    }

                }

            });

        });
    }

Здесь строка record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); всегда дает ноль, поэтому я предполагаю, что autoCommitOffset: false не работает, я попытался поместить приведенную ниже конфигурацию также в привязку раздел, но безрезультатно.

 receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              autoCommitOffset: false

Мое требование состоит в том, что, если я убью приложение, даже в параллельном сценарии оно должно продолжать читать сообщения из первого неподтвержденного сообщения.

1 Ответ

0 голосов
/ 22 апреля 2020

Проблема в том, что заголовок подтверждения не получен из-за неправильной позиции тегов. Так как это чисто кафка связующее свойство. Добавлено следующее свойство

spring:
  cloud:
    stream:
      kafka:
        bindings:
           receiver-in-0:
              consumer:
                   autoCommitOffset: false

Имя канала должно совпадать с именем канала в вызове функции. в качестве альтернативы может быть установлено значение по умолчанию.

 spring:
    cloud:
      stream:
        kafka:
           default:
            consumer:
               autoCommitOffset: false

Однако параллельная обработка входящего потока может не быть хорошей идеей, поскольку сообщения могут быть отброшены, поскольку некоторые более поздние сообщения могут быть подтверждены. Для подтверждения потребуется больше логи c, чем просто настройка подтверждения параметра.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...