Ошибка потребителя Kafka - PullRequest
0 голосов
/ 25 июня 2018

Я использую производителя kafka и потребителя Spring kafka. Я использую сериализатор и десериализатор Json. Всякий раз, когда я пытаюсь прочитать сообщения получателя из темы, я получаю следующую ошибку:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

Я не настроил ничего о заголовках ни у производителя, ни у потребителя. Что мне здесь не хватает?

Ответы [ 2 ]

0 голосов
/ 11 апреля 2019

просто добавив к ответу выше,

Следующие изменения решены для меня.

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

добавление

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));

вместо

return new DefaultKafkaConsumerFactory<String, String>(config);

Для справки:

метод ниже deserialize ожидание заголовков и бросков "Assert.state.." IllegalStateException

 @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            JavaType javaType = this.typeMapper.toJavaType(headers);
            if (javaType == null) {
                Assert.state(this.targetType != null, "No type information in headers and no default type provided");
                return deserialize(topic, data);
            }
            else {
                try {
                    return this.objectMapper.readerFor(javaType).readValue(data);
                }
                catch (IOException e) {
                    throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
                            "] from topic [" + topic + "]", e);
                }
            }
        }
0 голосов
/ 25 июня 2018

Я полагаю, что вы упускаете тот факт, что JsonDeserializer должен быть настроен на ConsumerFactory с соответствующим типом по умолчанию для десериализации, но не в свойствах Kafka.

Вся информация представленав документах: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...