В настоящее время я застрял в простом примере ручного управления смещениями и фиксациями Kafka.У меня есть приложение с Spring Cloud Streams, которое устанавливает enable.auto.commit = false
(отображается в журнале запуска при печати ConsumerValues), но все же, когда я анализирую сообщение, оно не предоставляет заголовок подтверждения.
Это мойслушатель:
@StreamListener(Sink.INPUT)
public void handleSchedulerMessage(@Payload SchedulerEvent event, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
log.debug("[message={}]", event);
// todo: processing
log.debug("Event processed successfully [event={}]", event);
}
YAML для конфигурации также очень прост:
spring:
application:
name: scheduler
cloud:
stream:
kafka:
binder:
brokers: *kafka-broker*:9092
zkNodes: *zookeeper*:2181
bindings:
input:
destination: scheduler
contentType: application/json
consumer:
autoCommitOffset: false
И когда я отправляю сообщение, сразу же появляется ошибка:
2018-05-22 11:38:32.470 ERROR 11651 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Missing header 'kafka_acknowledgment' for method parameter type [interface org.springframework.kafka.support.Acknowledgment], failedMessage=GenericMessage [payload=byte[38], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@fdac355, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=scheduler, kafka_receivedTimestamp=1526981909241, contentType=application/json}]
Полученное сообщение не содержит требуемого заголовка, в отличие от того, что говорится в документации, когда autoCommit отключен:
Whether to autocommit offsets when a message has been processed. If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header will be present in the inbound message. Applications may use this header for acknowledging messages.
Код не сложный, и я не использую какой-либо предварительно созданный проект.Примеры не объясняют больше, чем то, что я сделал, поэтому я не знаю, чего мне не хватает.