Получить имя файла, используя hadoopFile - PullRequest
0 голосов
/ 03 октября 2018

Я использую Spark 2.2 вместе с Scala 2.11 для анализа каталога и преобразования данных внутри него.

Для обработки кодировки ISO я использую hadoopFile, например:

val inputDirPath = "myDirectory"
sc.hadoopFile[LongWritable, Text, TextInputFormat](inputDirPath).map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, "iso-8859-1")).map(ProcessFunction(_)).toDF

Как получить имя файла каждой строки в ProcessFunction?ProcessFunction принимает String в параметре и возвращает объект.

Спасибо за потраченное время

Ответы [ 2 ]

0 голосов
/ 04 октября 2018

Ответы, включая вашу функцию ProcessFunction

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
val inputDirPath = "dataset.txt"
val textRdd = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputDirPath)
// cast TO HadoopRDD
val linesWithFileNames = rddHadoop.mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tuple => (file.getPath, new String(tuple._2.getBytes, 0, tuple._2.getLength, "iso-8859-1")))
}).map{case (path, line) =>  (path, ProcessFunction(line)}
0 голосов
/ 03 октября 2018

val textRdd = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputDirPath) // cast TO HadoopRDD val linesWithFileNames = textRdd.asInstanceOf[HadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((inputSplit, iterator) => { val file = inputSplit.asInstanceOf[FileSplit] iterator.map(tuple => (file.getPath, tuple._2)) } ) linesWithFileNames.foreach(println)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...