Как записать вывод в виде файла с фиксированной шириной из искры в hdfs? - PullRequest
0 голосов
/ 25 января 2019

Мне нужно прочитать CSV-файл из hdfs, затем мне нужно применить логику, согласно которой каждый столбец дополняется до фиксированной ширины, затем мне нужно сохранить обратно в hdfs только как файл фиксированной ширины. Не в любой другой форме, например, CSV или паркет.

Если я читаю ввод из hdfs как csv, который выглядит как пример ниже:

Name, age, phonenumber
A, 25,9900999999
B, 26,7654890234
C, 27,5643217897

Затем мне нужно применить логику к каждому столбцу с фиксированной шириной, например, ширина первого столбца должна быть равна 15, 2-й столбец 3, 3-й - 10

Вывод должен выглядеть в формате hdfs.

Name      age   phonenumber           
A         25    9900999999
B         26    7654890234
C         27    5643217897

Затем эти данные с фиксированной шириной мне нужно записать в hdfs как файл с фиксированной шириной.

Ответы [ 2 ]

0 голосов
/ 25 января 2019

Вам нужно привести все столбцы как строку, если inferSchema уже используется. Сопоставьте длину с df.columns, чтобы вы могли обрабатывать это динамически. Проверьте это:

scala> val df = Seq(("A", 25,9900999999L),("B", 26,7654890234L),("C", 27,5643217897L)).toDF("Name","age","phonenumber")
df: org.apache.spark.sql.DataFrame = [Name: string, age: int ... 1 more field]

scala> df.show(false)
+----+---+-----------+
|Name|age|phonenumber|
+----+---+-----------+
|A   |25 |9900999999 |
|B   |26 |7654890234 |
|C   |27 |5643217897 |
+----+---+-----------+


scala> val widths = Array(5,3,10)
widths: Array[Int] = Array(5, 3, 10)

scala> df.columns.zip(widths)
res235: Array[(String, Int)] = Array((Name,5), (age,3), (phonenumber,10))

scala> df.columns.zip(widths).foldLeft(df){ (acc,x) => acc.withColumn(x._1,rpad( trim(col(x._1).cast("string")),x._2," ")) }.show(false)
+-----+---+-----------+
|Name |age|phonenumber|
+-----+---+-----------+
|A    |25 |9900999999 |
|B    |26 |7654890234 |
|C    |27 |5643217897 |
+-----+---+-----------+

Для проверки заполнения ..

scala> df.columns.zip(widths).foldLeft(df){ (acc,x) => acc.withColumn(x._1,rpad( trim(col(x._1).cast("string")),x._2,"-")) }.show(false)
+-----+---+-----------+
|Name |age|phonenumber|
+-----+---+-----------+
|A----|25-|9900999999 |
|B----|26-|7654890234 |
|C----|27-|5643217897 |
+-----+---+-----------+


scala>
0 голосов
/ 25 января 2019

ваш пример вывода имеет разделитель, поскольку между возрастом и номером телефона есть пробелы.Я предполагаю, что это разделитель вкладок, в этом случае вы можете прочитать CSV и сохранить его обратно с spark.save.option("sep","\t").csv("filename")

, но если то, что вы написали правильно, вы можете получить его с помощью:

import org.apache.spark.sql.functions.rpad
val df=spark.read.option("header","false").csv("...")
val out= df.select(rpad($"_c0",15," "),
                   rpad($"_c1",3," "),
                   rpad($"_c2",10," "))                   
out.map(_.mkString("")).write.text("filename")

обратите внимание, что я установил заголовок в false, чтобы они также были дополнены

...