Получение Kafka Key in весны загрузки кафки слушателя - PullRequest
0 голосов
/ 26 сентября 2019

Я новенький в весенней кафке.У меня есть микросервис, который отправляет сообщение с ключом kafka, который является пользовательским объектом.

1) Первый микросервис отправляет сообщение в Kafka с ключом, который является экземпляром объекта MyKey.

2) Мне нужно прослушать эту тему и получить это сообщение с ключоми с помощью этого ключа создайте новый ключ.

Допустим, сообщение отправлено с помощью ключа myKey.И что я хочу сделать в слушателе, это создать новый расширенный ключ как:

     @KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
     public void process( @Payload MyMessage myMessage){

        MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
        ....
        ....
        kafkaTemplate.send(TOPIC,  myExtendedKey, message);
      }

Я не знаю, как я могу получить ключ сообщения, которое отправляется в слушателе.

1 Ответ

0 голосов
/ 26 сентября 2019

Пожалуйста прочитайте документацию .

...

Наконец, метаданные о сообщении доступны из заголовков сообщений.Для получения заголовков сообщения можно использовать следующие имена заголовков:

KafkaHeaders.RECEIVED_MESSAGE_KEY

KafkaHeaders.RECEIVED_TOPIC

KafkaHeaders.RECEIVED_PARTITIONHeaders *ID. *5.RECEIVED_TIMESTAMP

KafkaHeaders.TIMESTAMP_TYPE

В следующем примере показано, как использовать заголовки:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

Смещение также доступно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...