Spark 2.3 - получить имя файла с помощью sc.textFile? - PullRequest
0 голосов
/ 26 апреля 2018

Я хочу добавить имя файла к каждой строке при вставке в БД. Получение папки в качестве ввода.

Загрузка папки с использованием textFile(...) метод:

public JavaRDD<Row> readFolder(String filePath) {

   JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();

   RegexMatch reg = new RegexMatch();
   JavaRDD<Row> rowRDD = logRDD
           .map((Function<String, Row>) line -> {
               String[] sp = line.split(" ");
               // want to add file name to Row, how to get it ?
               return RowFactory.create(sp[1], sp[3], sp[2]); 
           });

   rowRDD.persist(StorageLevel.MEMORY_ONLY());

   return rowRDD;
}

Изменил этот код на wholeTextFiles(...), получив здесь имя файла, но не уверен, как получить строку из него, как в коде выше?

public void readFolder(String filePath) {
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
    JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(filePath, 1);

    JavaRDD<String> lineCounts = fileNameContentsRDD.map((Function<Tuple2<String, String>, String>) fileNameContent -> {
        String content = fileNameContent._2();
        int numLines = content.split("[\r\n]+").length;

        return fileNameContent._1() + ":  " + numLines;
    });

    List<String> output = lineCounts.collect();
    System.out.println(output);

}

Пожалуйста, предложите.

Ответы [ 2 ]

0 голосов
/ 26 апреля 2018

Поскольку вы используете Spark 2.3, используйте SparkSession API для чтения текстового файла,

Dataset<String> textDS = session.read().textFile(filePath);

, затем вы можете использовать это для получения имени файла ввода,

String fileName = textDS.inputFiles()[0];

use textDS.toJavaRDD() для преобразования Dataset в rdd и применения вашей логики.

0 голосов
/ 26 апреля 2018

Просто объедините оба как

public JavaRDD<Row> readFolder(String filePath) {
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
    JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(filePath, 1);

    JavaRDD<Row> rowRDD = fileNameContentsRDD.flatMap((FlatMapFunction<Tuple2<String, String>, Row>) fileNameContent -> {
        String fileName = fileNameContent._1();
        String content = fileNameContent._2();
        String[] lines = content.split("[\r\n]+");
        List<Row> array = new ArrayList<Row>(lines.length);
        for(String line : lines){
            String[] sp = line.split(" ");
            array.add(RowFactory.create(fileName, sp[1], sp[3], sp[2]));
        }
        return array.iterator();
    });

    return rowRDD;
}
...