Я пытаюсь понять, как использовать другой сериализатор / десериализатор ключей в Spring Cloud Stream.Я нашел способ указать глобальные serdes, но я не понимаю, как указать разные serdes для привязки, чтобы разрешить разные типы ключей (Integer, Strings и т. Д.).Например, следующий конфиг определяет глобальные ключи, используя свойства spring.kafka.consumer.keyDeserializer
и spring.kafka.producer.keySerializer
:
spring:
cloud:
stream:
bindings:
input:
contentType: application/*+avro
destination: user
group: my-group
output:
contentType: application/*+avro
destination: user
producer:
partition-count: 2
kafka:
binder:
brokers: default:9092
schemaRegistryClient:
endpoint: http://default:8081
kafka:
consumer:
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
keySerializer: org.apache.kafka.common.serialization.StringSerializer
Я могу использовать и выдавать ключ сообщения, используя следующие фрагменты кода:
// Consumer
@StreamListener(Sink.INPUT)
public void handle(@Payload UserValue user, @Headers Map<String, Object> headers,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Received: " + user + " with key: " + key + " and headers: " + headers);
}
// Producer
UserValue user = UserValue.newBuilder().setName("Alessandro").setSurname("Dionisi").build();
output.send(MessageBuilder.withPayload(user).setHeader(KafkaHeaders.MESSAGE_KEY, "1").build());