Фрейм данных Spark равен NULL (Invalid Tree) - PullRequest
1 голос
/ 14 мая 2019

У меня есть задание spark (spark 2.1), которое обрабатывает потоковые данные, используя прямой поток Kafka.Я обогатил данные потока файлами данных, хранящимися в HDFS.Сначала я читаю файлы данных (*. Parquet) и сохраняю их во фрейме данных, затем обогащаю по одной записи каждый раз этим фреймом данных.

Код запустился без ошибок, но обогащение не произошло.Я запустил коды в режиме отладки и обнаружил, что кадр данных (например, df) отображается как недопустимое дерево.Почему фрейм данных является нулевым внутри rdd.foreachPartition?как исправить эту проблему?Спасибо!

 val kafkaSinkVar = ssc.sparkContext.broadcast(KafkaSink(kafkaServers, outputTopic))

 Service.aggregate(kafkaInputStream).foreachRDD(rdd => {  
      val df =ss.read.parquet( filePath + "/*.parquet" )
      println("Record Count in DF: " + df.count())  ==> the console shows the files were loaded successfully with the record count = 1300
      rdd.foreachPartition(partition => {
        val futures = partition.map(event => {
          sentMsgsNo.add(1L)
          val eventEnriched = someEnrichmen1(event,df)  ==> df is shown as invalid tree here
          kafkaSinkVar.value.sendCef(eventEnriched)
        })

        })
      })
    })
...