Как преобразовать список в строку с несколькими столбцами - PullRequest
1 голос
/ 05 ноября 2019

Создайте DataFrame из CSV-файла, обработайте каждую строку, хотите создать новую строку с тем же количеством столбцов.

val df = spark.read.format("csv").load("data.csv")
def process(line: Row) : Seq[String] = {
  val list = new ArrayList[String]
  for (i <- 0 to line.size-1) {
    list.add(line.getString(i).toUpperCase)
  }
  list.asScala.toSeq
}
val df2 = df.map(process(_))
df2.show

Ожидается / надежда на получение:

+---+---+---+                                                                   
| _1| _2| _3|
+---+---+---+
| X1| X2| X3|
| Y1| Y2| Y3|
+---+---+---+

Получение:

+------------+                                                                     
|       value|
+------------+
|[X1, X2, X3]|
|[Y1, Y2, Y3]|
+------------+

Входной файл data.csv:

x1,x2,x3
y1,y2,y3

Обратите внимание, что код должен работать и в этом входном файле:

x1,x2,x3,x4
y1,y2,y3,y4

И для этого входного файла я хотел бы видеть результат

+---+---+---+---+                                                               
| _1| _2| _3| _4|
+---+---+---+---+
| X1| X2| X3| X4|
| Y1| Y2| Y3| Y4|
+---+---+---+---+

Обратите внимание, что я использовал tpUpperCase () в process () просто для того, чтобы простой пример работал. Реальная логика в process () может быть намного более сложной.

1 Ответ

2 голосов
/ 06 ноября 2019

Второе обновление для изменения rdd на строку

@ USML , в основном изменено Seq [String] на Row так что rdd может быть paralellized . это распределенная параллельная коллекция, которую необходимо сериализовать

val df2 = csvDf.rdd.map(process(_)).map(a => Row.fromSeq(a)) 
//df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
// And we use dynamic Schema (e.g. same number of columns as csv
spark.createDataFrame(df2, schema = dynamicSchema).show(false)
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|X1 |X2 |X3 |
|Y1 |Y2 |Y3 |
+---+---+---+

Обновление измененного требования Пока вы читаете CSV,конечный вывод будет иметь те же числа столбцов в качестве вашего CSV, поскольку мы используем df.schema для создания фрейма данных после вызова процесс метод. Попробуйте это:

val df = spark.read.format("csv").load("data.csv")
val dynamicSchema = df.schema // This makes sure to prserve  same number of columns
def process(line: Row) : Seq[String] = {
  val list = new ArrayList[String]
  for (i <- 0 to line.size-1) {
    list.add(line.getString(i).toUpperCase)
  }
  list.asScala.toSeq
}
val df2 = df.rdd.map(process(_)).map(a => Row.fromSeq(a)) // df2 is actually an RDD // updated conversion to Row

val finalDf = spark.createDataFrame(df2, schema = dynamicSchema) // We use same schema

finalDf.show(false)

Содержимое файла =>

cat data.csv
a1,b1,c1,d1
a2,b2,c2,d2

Код =>

import org.apache.spark.sql.Row
val csvDf = spark.read.csv("data.csv")
csvDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a1 |b1 |c1 |d1 |
|a2 |b2 |c2 |d2 |
+---+---+---+---+

def process(cols: Row): Row  = { Row("a", "b", "c","d") } // Check the Data Type

val df2 = csvDf.rdd.map(process(_)) // df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

val finalDf = spark.createDataFrame(df2,schema = csvDf.schema)

finalDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a  |b  |c  |d  |
|a  |b  |c  |d  |
+---+---+---+---+

Очкик сведению Строка необходим тип данных для Карта Строка

Лучше иметь класс безопасности типа Отдых должен быть легким

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...