Сбой производителя Spring Kafka при отправке сообщения после обновления до spring-boot 2.0 - PullRequest
0 голосов
/ 08 октября 2018

Попытка обновить службу spring-boot для использования spring-boot 2.0.5 и spring-kafka 2.1.10 (из spring-boot 1.5.x и spring-kafka 1.3.x), однако я получаю сообщение об ошибке, когдапытается опубликовать сообщение с помощью JsonSerializer.

Мой конфиг производителя с (JsonSerializer.ADD_TYPE_INFO_HEADERS, false):

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, KafkaMessage> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return configProps;
}

@Bean
public KafkaTemplate<String, KafkaMessage> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
}

Версия Kafka (редактировать, используя версию kafka 1.0.0):

[2018-10-08 19:20:53,562] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-08 19:20:53,562] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)

[2018-10-08 18: 24: 40,252] ИНФОРМАЦИЯ Версия Kafka: 2.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2018-10-08 18: 24: 40,252] ИНФОРМАЦИЯ Kafka commitId: 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)

Ошибка приложения:

java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request

Ошибка в журнале кафки:

java.lang.IllegalArgumentException: Magic v1 does not support record headers

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Источник моей проблемы был с настройкой нашей реализации kafka.Следующие свойства были настроены с неверной / более старой версией kafka:

  • inter.broker.protocol.version
  • log.message.format.version

После удаления этих настроек мне удалось создать и потреблять сообщения из приложений весенней загрузки 2.

0 голосов
/ 08 октября 2018

Magic v1 не поддерживает заголовки записей выбрасывается при возникновении проблемы совместимости между версией брокера kafka и несоответствием версии клиента.

spring-kafka 2.1.10 не поддерживает версию kafka 2.0.0.Вам нужно использовать spring-kafka 2.2.x для работы с kafka 2.0.0.

Для совместимости с клиентом Kafka посмотрите этот официальный doc from spring.

...