ОШИБКА kafka010.KafkaRDD: Кафка ConsumerRecord не сериализуема. Используйте .map для извлечения полей перед вызовом .persist или .window - PullRequest
0 голосов
/ 18 июня 2019

Я использую режим пряжи и кластера, версия streaming-kafka - 0.10.Тем не менее, следующие ошибки «ОШИБКА kafka010.KafkaRDD: Kafka ConsumerRecord не сериализуем. Использовать .map для извлечения полей перед вызовом .persist или .window» появляется часто?в чем причина?

val topic = Configuration.new_feed
    val groupId = "star_feed_consumer"
    val duration = "1000"
    val kafkaParams = KafkaUtil.initKafkaParams(Configuration.bootstrap_servers_log, groupId, duration)
    val topics = Array(topic)
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    val cachedStream = stream.cache()
    val new_feeds = cachedStream.map(record => parseJson(record.value))

    //new_feeds.foreachRDD(rdd => println(rdd))
    new_feeds.print(10)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...