У меня есть задание spark (spark 2.1), которое обрабатывает потоковые данные, используя прямой поток Kafka.Я обогатил данные потока файлами данных, хранящимися в HDFS.Сначала я читаю файлы данных (*. Parquet) и сохраняю их во фрейме данных, затем обогащаю по одной записи каждый раз этим фреймом данных.
Код запустился без ошибок, но обогащение не произошло.Я запустил коды в режиме отладки и обнаружил, что кадр данных (например, df) отображается как недопустимое дерево.Почему фрейм данных является нулевым внутри rdd.foreachPartition?как исправить эту проблему?Спасибо!
val kafkaSinkVar = ssc.sparkContext.broadcast(KafkaSink(kafkaServers, outputTopic))
Service.aggregate(kafkaInputStream).foreachRDD(rdd => {
val df =ss.read.parquet( filePath + "/*.parquet" )
println("Record Count in DF: " + df.count()) ==> the console shows the files were loaded successfully with the record count = 1300
rdd.foreachPartition(partition => {
val futures = partition.map(event => {
sentMsgsNo.add(1L)
val eventEnriched = someEnrichmen1(event,df) ==> df is shown as invalid tree here
kafkaSinkVar.value.sendCef(eventEnriched)
})
})
})
})