Я создаю приложение Kafka, используя сжатие журналов по теме, но не могу отправить значение Tombstone (KafkaNull)
Я попытался использовать конфигурацию по умолчанию для сериализатора, и когда это не сработало, я использовал предложенные изменения из " Опубликовать сообщение null / tombstone с необработанными заголовками " Чтобы установить для application.properties значение:
spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer
Код для отправки сообщения в поток:
this.stockTopics.compactedStocks().send(MessageBuilder
.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
.build())
this.stopTopics.compactedStocks () возвращает messageStream, на которое я могу отправлять сообщения.
Каждый раз, когда я пытаюсь отправить это сообщение с экземпляром KafkaNull в качестве полезной нагрузки, я получаю сообщение об ошибке Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.
Я ожидаю, что сообщение будет просто отправлено потребителю с нулевым значением, но очевидно, что оно содержит ошибки.