Я использую режим пряжи и кластера, версия streaming-kafka - 0.10.Тем не менее, следующие ошибки «ОШИБКА kafka010.KafkaRDD: Kafka ConsumerRecord не сериализуем. Использовать .map для извлечения полей перед вызовом .persist или .window» появляется часто?в чем причина?
val topic = Configuration.new_feed
val groupId = "star_feed_consumer"
val duration = "1000"
val kafkaParams = KafkaUtil.initKafkaParams(Configuration.bootstrap_servers_log, groupId, duration)
val topics = Array(topic)
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val cachedStream = stream.cache()
val new_feeds = cachedStream.map(record => parseJson(record.value))
//new_feeds.foreachRDD(rdd => println(rdd))
new_feeds.print(10)