Поглотить сообщение Кафки с пользовательским заголовком - PullRequest
1 голос
/ 15 марта 2019

Я пытаюсь создать Потребитель kafka с Spring Cloud Stream для прослушивания сообщения Kafka, созданного вне любого контекста Spring, с настраиваемым заголовком ( operationType ).

Я использую Spring Boot 1.5.x / Spring Cloud Egdware.SR5 и версии 1.1.1 kafka-client и kafka_2.11.

Класс My Listener содержит этот метод

@StreamListener(value = "dataset-changed", condition = "headers['operationType']=='UPDATE'")
public void onEvent(@Payload DatasetChangedMessage payload) {
  // my code should be execute only if the header operationType == UPDATE
}

Конфигурация Spring Cloud Stream:

spring.cloud.stream:
  bindings:
    dataset-changed:
      group: preparation
      content-type: application/json
      destination: dataset-changed
      consumer:
        headerMode: raw
        configuration:
          key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

Производитель - простой документ Java с клиентом kafka: 1.1.1 библиотека

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<byte[], String> producer = new KafkaProducer<>(producerConfig);

// Headers (to condition the kafka listener)
final List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("operationType", "UPDATE".getBytes()));

ProducerRecord<byte[], String> record =
        new ProducerRecord<>("dataset-changed", 0, "111".getBytes(), getJsonPayload(), headers);
Future<RecordMetadata> future = producer.send(record);
future.get();

producer.close();

Когда я создаю сообщение kafka, у меня появляются такие предупреждения

2019-03-15 14:48:32.103  WARN [tdp-preparation,1e24b9764ef9bb14,1e24b9764ef9bb14,false] 34760 --- [           -L-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: ea27a446-69da-7b8d-1b94-50b46a40dfde

, тогда как присутствует заголовок operationType

enter image description here

1 Ответ

0 голосов
/ 15 марта 2019

Я считаю, что вы теряете headers на стороне потребителя, потому что вы используете headerMode: raw. По сути это означает, что none - отображение без заголовков.

Попробуйте вместо этого использовать headerMode: headers.

См. Документацию для получения дополнительной информации: https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#_consumer_properties

headerMode

Если установлено значение none, отключает анализ заголовка при вводе. Действует только для промежуточного программного обеспечения для обмена сообщениями, которое не поддерживает заголовки сообщений и требует встраивания заголовков. Этот параметр полезен при использовании данных из приложений, отличных от Spring Cloud Stream, когда собственные заголовки не поддерживаются. Если установлено значение headers, используется механизм заголовка промежуточного программного обеспечения. При значении embeddedHeaders заголовки включаются в полезную нагрузку сообщения.

По умолчанию: зависит от реализации связующего.

...