Я запускаю приложение Spring Boot и набрал compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')
Я пытаюсь пойти против установки Cloudera с этой версией:
Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50
Мой класс KafkaProducerConfig довольно прост:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
}
Когда я запускаю приложение, я получаю:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
Затем я отправляю полезную нагрузку. Он отображается в инструменте Кафки против темы. Но в журналах на стороне Кафки, когда данные пытаются попасть внутрь, я получаю:
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
Я пробовал следующие вещи со стороны приложения Producer:
- Понижение до Spring Kafka 2.0.4. Я надеялся, что переход на версию 0.11.0 Kafka поможет избавиться от проблемы, но это не дало никаких результатов.
- Проверено, что все узлы имеют одинаковую версию. По словам моего администратора, они есть.
- Проверено у моего администратора, что у нас нет смешанной установки. Опять мне сказали, что у нас нет.
- Основываясь на аналогичном вопросе переполнения стека, я вернулся к версии 2.1.5 и попытался вставить в JsonSerializer.ADD_TYPE_INFO_HEADERS значение false. Я подумал, что, возможно, это удалит заголовки, на которые ссылается журнал. Опять не пошел, и была зарегистрирована та же ошибка.
Я надеюсь, что упускаю что-то очевидное. Есть ли дополнительные заголовки, которые мне нужно включить / выключить, чтобы помочь решить проблему Magic v0, о которой кто-либо знает?
У нас есть другие приложения, которые успешно пишут на другие темы в той же среде, но это старые приложения, которые вручную создают необходимые компоненты Spring. Кроме того, эти приложения также используют гораздо более старый клиент (0.8.2.2) и используют StringSerializer для значения Producer вместо JSON. Мне нужно, чтобы мои данные были в JSON, и я действительно не хочу переходить на 0.8.2.2, когда мы находимся в системе, которая должна поддерживать 0.11.x.