Spring Kafka - ошибка "Magic v0 не поддерживает заголовки записи" - PullRequest
0 голосов
/ 02 мая 2018

Я запускаю приложение Spring Boot и набрал compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')

Я пытаюсь пойти против установки Cloudera с этой версией:

Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50

Мой класс KafkaProducerConfig довольно прост:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;

@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;

@Value("${spring.kafka.producer.client-id}")
private String producerClientId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

    return props;
}

@Bean
public ProducerFactory<String, Pdid> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
    KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());

    kafkaTemplate.setDefaultTopic(this.defaultTopicName);

    return kafkaTemplate;
}

@PostConstruct
public void postConstruct() {
    LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
    LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}

Когда я запускаю приложение, я получаю:

2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e

Затем я отправляю полезную нагрузку. Он отображается в инструменте Кафки против темы. Но в журналах на стороне Кафки, когда данные пытаются попасть внутрь, я получаю:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

Я пробовал следующие вещи со стороны приложения Producer:

  1. Понижение до Spring Kafka 2.0.4. Я надеялся, что переход на версию 0.11.0 Kafka поможет избавиться от проблемы, но это не дало никаких результатов.
  2. Проверено, что все узлы имеют одинаковую версию. По словам моего администратора, они есть.
  3. Проверено у моего администратора, что у нас нет смешанной установки. Опять мне сказали, что у нас нет.
  4. Основываясь на аналогичном вопросе переполнения стека, я вернулся к версии 2.1.5 и попытался вставить в JsonSerializer.ADD_TYPE_INFO_HEADERS значение false. Я подумал, что, возможно, это удалит заголовки, на которые ссылается журнал. Опять не пошел, и была зарегистрирована та же ошибка.

Я надеюсь, что упускаю что-то очевидное. Есть ли дополнительные заголовки, которые мне нужно включить / выключить, чтобы помочь решить проблему Magic v0, о которой кто-либо знает?

У нас есть другие приложения, которые успешно пишут на другие темы в той же среде, но это старые приложения, которые вручную создают необходимые компоненты Spring. Кроме того, эти приложения также используют гораздо более старый клиент (0.8.2.2) и используют StringSerializer для значения Producer вместо JSON. Мне нужно, чтобы мои данные были в JSON, и я действительно не хочу переходить на 0.8.2.2, когда мы находимся в системе, которая должна поддерживать 0.11.x.

Ответы [ 2 ]

0 голосов
/ 02 мая 2018

Решение проблемы было смесью двух вещей:

  1. Мне нужно было добавить JsonSerializer.ADD_TYPE_INFO_HEADERS to false, как предложил Гари Рассел.
  2. Мне нужно было сбросить все записи, которые были помещены в тему до , когда эта конфигурация была введена в мое приложение. В предыдущих записях были заголовки, которые портили потребителя Flume.
0 голосов
/ 02 мая 2018

но это более старые приложения, которые производят вручную необходимые бобы Spring.

на org.apache.kafka.common.record.FileRecords. downConvert (FileRecords.java:245)

Я не знаком с внутренними компонентами брокера kafka, но это «звучит» так, как будто темы были созданы со старым брокером, и их форматы не поддерживают заголовки, а не саму версию брокера (подсказка: downConvert).

Вы пробовали это с чистым брокером?

Клиент 1.0.x может общаться с более старыми брокерами (до версии 0.10.2.x IIRC), если вы не пытаетесь использовать функции, которые не поддерживаются брокером. Тот факт, что ваш брокер имеет значение 0,11 (которое поддерживает заголовки), является еще одним свидетельством того, что проблема заключается в формате записи темы.

Я успешно протестировал версии брокера / клиента / темы без проблем, если вы используете подмножество общих функций.

JsonSerializer.ADD_TYPE_INFO_HEADERS в false.

Это должно помешать фреймворку устанавливать любые заголовки; вам нужно показать код вашего производителя (и всю конфигурацию).

Вы также можете добавить ProducerInterceptor в конфигурацию производителя и проверить свойство ProducerRecord headers в методе onSend(), чтобы выяснить, какие настройки устанавливаются в выходном сообщении.

Если вы используете сообщения Spring-сообщения (template.setn(Message<?> m), заголовки будут отображаться по умолчанию). Использование raw template.send() методов не установит никаких заголовков (если вы не отправите ProducerRecord с заполненными заголовками.

...