Невозможно отправить сообщение с KafkaNull в качестве значения - PullRequest
0 голосов
/ 18 января 2019

Я создаю приложение Kafka, используя сжатие журналов по теме, но не могу отправить значение Tombstone (KafkaNull)

Я попытался использовать конфигурацию по умолчанию для сериализатора, и когда это не сработало, я использовал предложенные изменения из " Опубликовать сообщение null / tombstone с необработанными заголовками " Чтобы установить для application.properties значение:

spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

Код для отправки сообщения в поток:

this.stockTopics.compactedStocks().send(MessageBuilder
    .withPayload(KafkaNull.INSTANCE)
    .setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
    .build())

this.stopTopics.compactedStocks () возвращает messageStream, на которое я могу отправлять сообщения.

Каждый раз, когда я пытаюсь отправить это сообщение с экземпляром KafkaNull в качестве полезной нагрузки, я получаю сообщение об ошибке Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.

Я ожидаю, что сообщение будет просто отправлено потребителю с нулевым значением, но очевидно, что оно содержит ошибки.

1 Ответ

0 голосов
/ 18 января 2019

Я открыл для этого выпуск GitHub .

EDIT

Обходной путь - это работает ...

@SpringBootApplication
@EnableBinding(Source.class)
public class So54257687Application {

    public static void main(String[] args) {
        SpringApplication.run(So54257687Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> output.send(new GenericMessage<>(KafkaNull.INSTANCE));
    }

    @KafkaListener(id = "foo", topics = "output")
    public void listen(@Payload(required = false) byte[] in) {
        System.out.println(in);
    }

    @Bean
    @StreamMessageConverter
    public MessageConverter kafkaNullConverter() {
        class KafkaNullConverter extends AbstractMessageConverter {

            KafkaNullConverter() {
                super(Collections.emptyList());
            }

            @Override
            protected boolean supports(Class<?> clazz) {
                return KafkaNull.class.equals(clazz);
            }

            @Override
            protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
                return message.getPayload();
            }

            @Override
            protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
                return payload;
            }

        }
        return new KafkaNullConverter();
    }

}
...