Spring Cloud @StreamListener не предоставляет заголовок подтверждения, даже если для автоматической фиксации установлено значение false - PullRequest
0 голосов
/ 22 мая 2018

В настоящее время я застрял в простом примере ручного управления смещениями и фиксациями Kafka.У меня есть приложение с Spring Cloud Streams, которое устанавливает enable.auto.commit = false (отображается в журнале запуска при печати ConsumerValues), но все же, когда я анализирую сообщение, оно не предоставляет заголовок подтверждения.

Это мойслушатель:

@StreamListener(Sink.INPUT)
public void handleSchedulerMessage(@Payload SchedulerEvent event, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    log.debug("[message={}]", event);
    // todo: processing
    log.debug("Event processed successfully [event={}]", event);
}

YAML для конфигурации также очень прост:

spring:
  application:
    name: scheduler
  cloud:
    stream:
      kafka:
        binder:
          brokers: *kafka-broker*:9092
          zkNodes: *zookeeper*:2181
      bindings:
        input:
          destination: scheduler
          contentType: application/json
          consumer:
            autoCommitOffset: false

И когда я отправляю сообщение, сразу же появляется ошибка:

2018-05-22 11:38:32.470 ERROR 11651 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: Missing header 'kafka_acknowledgment' for method parameter type [interface org.springframework.kafka.support.Acknowledgment], failedMessage=GenericMessage [payload=byte[38], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@fdac355, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=scheduler, kafka_receivedTimestamp=1526981909241, contentType=application/json}]

Полученное сообщение не содержит требуемого заголовка, в отличие от того, что говорится в документации, когда autoCommit отключен:

Whether to autocommit offsets when a message has been processed. If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header will be present in the inbound message. Applications may use this header for acknowledging messages.

Код не сложный, и я не использую какой-либо предварительно созданный проект.Примеры не объясняют больше, чем то, что я сделал, поэтому я не знаю, чего мне не хватает.

1 Ответ

0 голосов
/ 22 мая 2018

Похоже, вы теряете один уровень отступов в вашем YAML.

Согласно документам, свойство должно быть таким:

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

Но ваш пример выглядит так:

spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false

Обратите внимание на дополнительные .kafka. в середине.

Я не знаю, как помочь правильно управлять YAML, но это то, что мы должны сделать, чтобы он работал.

...