Пользовательские ключевые последовательности для привязки в Spring Cloud Stream - PullRequest
0 голосов
/ 09 мая 2019

Я пытаюсь понять, как использовать другой сериализатор / десериализатор ключей в Spring Cloud Stream.Я нашел способ указать глобальные serdes, но я не понимаю, как указать разные serdes для привязки, чтобы разрешить разные типы ключей (Integer, Strings и т. Д.).Например, следующий конфиг определяет глобальные ключи, используя свойства spring.kafka.consumer.keyDeserializer и spring.kafka.producer.keySerializer:

spring:
  cloud:
    stream:
      bindings:
        input:
          contentType: application/*+avro
          destination: user                    
          group: my-group
        output:
          contentType: application/*+avro
          destination: user     
          producer:
            partition-count: 2      
      kafka:
        binder:
          brokers: default:9092
      schemaRegistryClient:
        endpoint: http://default:8081
  kafka:
    consumer:
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      keySerializer: org.apache.kafka.common.serialization.StringSerializer

Я могу использовать и выдавать ключ сообщения, используя следующие фрагменты кода:

// Consumer
@StreamListener(Sink.INPUT)
    public void handle(@Payload UserValue user, @Headers Map<String, Object> headers,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("Received: " + user + " with key: " + key + " and headers: " + headers);
    }

// Producer
UserValue user = UserValue.newBuilder().setName("Alessandro").setSurname("Dionisi").build();
        output.send(MessageBuilder.withPayload(user).setHeader(KafkaHeaders.MESSAGE_KEY, "1").build());

1 Ответ

1 голос
/ 09 мая 2019

Вы можете сделать это через configuration свойство

spring.cloud.stream.kafka.bindings.<bindingName>.consumer.configuration.<whatever-kafka-property-name>=. . .

Подробнее здесь

...