Spark Scala - как создать DF из грязного .txt - PullRequest
0 голосов
/ 17 марта 2020

Я полностью новичок в области больших данных, пожалуйста, будьте проще.

Используя Scala Я загрузил файл .txt, содержимое которого выглядит следующим образом: [click] [1]

import scala.io.Source
val lines = Source.fromFile("C:/Users/me/Downloads/myText.txt ").getLines().toList

В любом случае, согласно заданию, я должен получить DataFrame в результате, поэтому я сделал следующее:

lines.toDF()

Результат:

+------------------------------+
|value                         |
+------------------------------+
|+---+------------------+-----+|
|| id|             Text1|Text2||
|+---+------------------+-----+|
||  1|     one,two,three|  one||
||  2|     four,one,five|  six||
||  3|seven,nine,one,two|eight||
||  4|    two,three,five| five||
||  5|      six,five,one|seven||
|+---+------------------+-----+|
+------------------------------+

Пока мой цель:

+---+------------------+-----+
|id |Text1             |Text2|
+---+------------------+-----+
|1  |one,two,three     |one  |
|2  |four,one,five     |six  |
|3  |seven,nine,one,two|eight|
|4  |two,three,five    |five |
|5  |six,five,one      |seven|
+---+------------------+-----+

Не могли бы вы предложить мне инструменты / методы для ее достижения?

Ответы [ 2 ]

0 голосов
/ 19 марта 2020

Файл можно прочитать, отфильтрованы неправильные строки и создан новый набор данных с помощью "csv":

val textFileDataSet = spark.read.text(fileName).as(Encoders.STRING)

val textWithoutUnderlines = textFileDataSet
  .filter(!_.contains("-"))
  .map(l => l.substring(1, l.length - 2))

val result = spark.read.option("header", "true").option("delimiter", "|").csv(textWithoutUnderlines)
0 голосов
/ 17 марта 2020

Ниже код должен помочь вам. По сути, я создаю RDD, применяю необходимые фильтры и тем самым преобразую его в DataFrame.

//Upto you as how many partitions you want depending on the data instead of 5
val myRDD = spark.sparkContext.textFile("mytext.txt", 5) 
                           .filter{x => !{ x.contains("-") || x.contains("+") || x.contains("id")}}
                           .map { y => y.substring(1, y.length()-2) }
                           .map { z => z.split("\\|") }
                           .map { a => Row(a(0), a(1), a(2))  } 

val schema = new StructType()
                    .add(StructField("id", StringType, true))
                    .add(StructField("Text1", StringType, true))
                    .add(StructField("Text2", StringType, true))

val myDF = spark.createDataFrame(myRDD, schema)

myDF.show()

Я просто фильтрую все ненужные файлы в данных, преобразуя их в RDD Row и, таким образом, преобразует его в фрейм данных, используя явное определение схемы.

Дайте мне знать, если это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...