SprintBoot кафка значение-сериализатор - PullRequest
1 голос
/ 09 апреля 2020

У меня есть проект SpringBoot с apache kafka (программное обеспечение для обработки потоков с открытым исходным кодом). У меня есть этот слушатель

@KafkaListener(topics = "test")
public String consume(Hostel hostel) throws IOException {
}

, этот сериализатор

public class HostelSerializer implements Deserializer<Hostel> {

    private final ObjectMapper objectMapper;

    public InputRequestMessageSerializer(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {
        this.configure(map,b);
    }

    @SneakyThrows
    @Override
    public Hostel deserialize(String s, byte[] bytes) {
        return objectMapper.readValue(bytes, Hostel.class);
    }

    @Override
    public void close() {
        this.close();
    }
}

и в Свойства:

spring:
    kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: group_id
          topics: test
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: com.kafka.config.HostelSerializer

, несмотря на то, что я получаю сообщение, у меня есть эта ошибка

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] 
to [com.message.Hostel] for GenericMessage [.....}]

при просмотре журнала он не получает значения из свойств:

2020-04-09 16:41:30,840 INFO  :  gid: trace= span= [main] o.a.k.c.consumer.ConsumerConfig ConsumerConfig values: 
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

1 Ответ

0 голосов
/ 09 апреля 2020

Вы смешиваете де / сериализацию. Поскольку вы настраиваете потребитель , вам нужно использовать только правильные десериализацию интерфейсы и реализации:

kafka:
consumer:
  bootstrap-servers: localhost:9092
  group-id: group_id
  topics: test
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: com.kafka.config.HostelDeserializer

... и ...

public class HostelDeserializer implements Deserializer<Hostel> { .. }
...