Spring cloud stream: заголовки kafka_acknowledgment значение равно нулю - PullRequest
0 голосов
/ 03 июля 2018

Я пытаюсь контролировать коммит смещения по теме кафки в соответствии со статусом обработки сообщения в приложении. Если сообщение успешно, значит смещение может быть зафиксировано. Для этого я пытаюсь получить заголовки в моем методе, если это сообщение может быть подтверждено вручную

  spring:
   cloud:
    stream:
      default:
        contentType: application/json
      default-binder: binder1-kafka
      bindings:
        myChannel:
          binder: binder1-kafka
          destination: my_topic
          content-type: text/plain
          consumer:
            autoCommitOffset: false

        outChannel:
          binder: binder2-kafka
          destination: my_topic
          content-type: text/plain
          consumer:
            autoCommitOffset: false    


      binders:
        #Connection config to different clusters
        binder1-kafka:
          type: kafka
          defaultCandidate: true
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: some-url1:9092
        binder2-kafka:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: some-url2:9092 

Но при использовании слушателя как

@StreamListener(target = IBrokerChannel.myChannel )
public void handlePayload(@Payload MyPayload payload, @Headers Map<String, Object> headers) {
  Acknowledgment acknowledgment= (Acknowledgment) headers.get("kafka_acknowledgment"); // acknowledgment object is always null.
  acknowledgment.acknowledge();
}

acknowledgment всегда null. Я использую Кафку продюсер Cli для отправки сообщения в тему spring-boot версия 1.5.10.RELEASE

Ответы [ 2 ]

0 голосов
/ 03 июля 2018

отлично работает у меня ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So51159949Application {

    public static void main(String[] args) {
        SpringApplication.run(So51159949Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("so51159949", "foo".getBytes());
        };
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Headers MessageHeaders headers) {
        System.out.println(in);
        System.out.println(headers);
        Acknowledgment ack = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        ack.acknowledge();
    }

}

и

spring:
  cloud:
    stream:
      bindings:
        input:
          group: so51159949
          destination: so51159949
      kafka:
        bindings:
          input:
            consumer:
              auto-commit-offset: false

и

foo
{kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ad39b5f, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so51159949, kafka_receivedTimestamp=1530643662028, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = so51159949, partition = 0, offset = 2, CreateTime = 1530643662028, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@700c9aa9), contentType=application/json}
0 голосов
/ 03 июля 2018

Вам не хватает ветки kafka в определении вашего свойства конфигурации. Должно быть так:

spring:
   cloud:
    stream:
      default-binder: kafka
      kafka:
         bindings:
           myChannel:
             consumer:
               autoCommitOffset: false 

https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#kafka-consumer-properties

...