Как я могу прочитать файлы HDFS от исполнителя spark? - PullRequest
0 голосов
/ 01 апреля 2019

У меня большой (> 500 м ряд) файл CSV.Каждая строка в этом CSV-файле содержит путь к двоичному файлу, расположенному в HDFS.Я хочу использовать Spark, чтобы прочитать каждый из этих файлов, обработать их и записать результаты в другой файл CSV или таблицу.

Это достаточно просто сделать в драйвере, и следующий код получает заданиеdone

val hdfsFilePathList = // read paths from CSV, collect into list

hdfsFilePathList.map( pathToHdfsFile => {
  sqlContext.sparkContext.binaryFiles(pathToHdfsFile).mapPartitions { 
    functionToProcessBinaryFiles(_)
  }
})

Основная проблема заключается в том, что драйвер выполняет слишком много работы.Я хотел бы передать работу, выполненную binaryFiles исполнителям.Я нашел несколько многообещающих примеров, которые, как мне показалось, позволили бы мне получить доступ к sparkContext от исполнителя:

Использовать конфигурацию Hadoop SparkContext в методах / замыканиях RDD, например, foreachPartition

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala

но, похоже, они не работают так, как я думал.Я ожидаю, что сработает следующее:

import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration

class ConfigSerDeser(var conf: Configuration) extends Serializable {

  def this() {
    this(new Configuration())
  }

  def get(): Configuration = conf

  private def writeObject (out: java.io.ObjectOutputStream): Unit = {
    conf.write(out)
  }

  private def readObject (in: java.io.ObjectInputStream): Unit = {
    conf = new Configuration()
    conf.readFields(in)
  }

  private def readObjectNoData(): Unit = {
    conf = new Configuration()
  }
}

val serConf = new ConfigSerDeser(sc.hadoopConfiguration)

val mappedIn = inputDf.map( row => {
    serConf.get()
})

Но это не сработает с KryoException: java.util.ConcurrentModificationException

Возможно ли, чтобы исполнители имели прямой доступ к файлам HDFS или файловой системе HDFS?Или, в качестве альтернативы, существует ли эффективный способ чтения миллионов двоичных файлов в HDFS / S3 и обработки их с помощью Spark?

1 Ответ

0 голосов
/ 01 апреля 2019

Был похожий случай, когда я пытался сделать то же самое, но понял, SparkSession или SparkContext не сериализуемы, поэтому они не доступны для исполнителей.

...