Заголовок KafkaHeaders.RECEIVED_MESSAGE_KEY против заголовка KafkaHeaders.MESSAGE_KEY в Spring Cloud - PullRequest
0 голосов
/ 23 марта 2020

Я использую весенний облачный поток. Мне было интересно, в чем разница между KafkaHeaders.RECEIVED_MESSAGE_KEY и KafkaHeaders.MESSAGE_KEY

У меня есть 2 проекта, первый производит сообщение с использованием KafkaHeaders.MESSAGE_KEY в качестве заголовка:

    public void sendResponse(ThirdPartyResponse thirdPartyResponse) {

        log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
        integrations.send(
                withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
                        .setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
                        .build());

    }

и второй один потребляет используя KafkaHeaders.RECEIVED_MESSAGE_KEY

@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {

...
}

однако я получил эту ошибку

    2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : 
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
 headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, 
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, 
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]

В нем отсутствует заголовок

Missing header 'kafka_receivedMessageKey'

Как я могу это исправить?

1 Ответ

0 голосов
/ 23 марта 2020

RECEIVED... устанавливается для входящих сообщений; другой - для приложения, чтобы указать значение ключа для исходящих сообщений.

Они отличаются, чтобы избежать случайного распространения, когда приложение получает сообщение, выполняет некоторую работу и повторно публикует сообщение, скажем, в другой теме. c.

При использовании Spring Integration заголовки автоматически копируются при прохождении сообщения через поток.

Устройство отображения исходящих сообщений не отображает заголовки RECEIVED..., поэтому они не появляются в ProducerRecord.

... kafka_receivedMessageKey=null ...

Означает, что ключ был нулевым во входящей записи.

Чтобы получить нулевые ключи, используйте

@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
...