У меня большой (> 500 м ряд) файл CSV.Каждая строка в этом CSV-файле содержит путь к двоичному файлу, расположенному в HDFS.Я хочу использовать Spark, чтобы прочитать каждый из этих файлов, обработать их и записать результаты в другой файл CSV или таблицу.
Это достаточно просто сделать в драйвере, и следующий код получает заданиеdone
val hdfsFilePathList = // read paths from CSV, collect into list
hdfsFilePathList.map( pathToHdfsFile => {
sqlContext.sparkContext.binaryFiles(pathToHdfsFile).mapPartitions {
functionToProcessBinaryFiles(_)
}
})
Основная проблема заключается в том, что драйвер выполняет слишком много работы.Я хотел бы передать работу, выполненную binaryFiles
исполнителям.Я нашел несколько многообещающих примеров, которые, как мне показалось, позволили бы мне получить доступ к sparkContext от исполнителя:
Использовать конфигурацию Hadoop SparkContext в методах / замыканиях RDD, например, foreachPartition
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
но, похоже, они не работают так, как я думал.Я ожидаю, что сработает следующее:
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
class ConfigSerDeser(var conf: Configuration) extends Serializable {
def this() {
this(new Configuration())
}
def get(): Configuration = conf
private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}
private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}
private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}
val serConf = new ConfigSerDeser(sc.hadoopConfiguration)
val mappedIn = inputDf.map( row => {
serConf.get()
})
Но это не сработает с KryoException: java.util.ConcurrentModificationException
Возможно ли, чтобы исполнители имели прямой доступ к файлам HDFS или файловой системе HDFS?Или, в качестве альтернативы, существует ли эффективный способ чтения миллионов двоичных файлов в HDFS / S3 и обработки их с помощью Spark?