Раздвиньте или разверните кадр данных scala по горизонтали, чтобы создать большой плоский кадр данных - PullRequest
0 голосов
/ 25 февраля 2020

У меня есть фрейм данных со следующей схемой:

UserID | StartDate | endDate | orderId | OrderCost| OrderItems| OrderLocation| Rank

Где Rank - от 1 до 10. Мне нужно переместить этот фрейм данных в ранг и создать фрейм данных в следующем формате:

UserID| StartDate_1 | endDate_1 | orderId_1 | OrderCost_1| OrderItems_1| OrderLocation_1|start_2 |endDate_2| orderId_2 | OrderCost_2| OrderItems_2| OrderLocation_2 |............| startDate_N|endDate_N | orderId_N | OrderCost_N| OrderItems_N| OrderLocation_N

Если у пользователя есть только две записи с рангом 3 и 10, необходимо заполнить столбцы суффиксами _3 и _10, остальные значения ячеек для пользователя будут равны нулю.

Я испробовал 2 подхода грубой силы

  1. Отфильтруйте DF по рангу, переименуйте столбцы с суффиксом и сами присоединитесь к DF.

  2. Группировка по идентификатору пользователя, сбор в виде списка и передача его функции отображения, где я заполняю массив на основе ранга, а затем возвращаю последовательность строки. Создайте DF, передав необходимую схему

Кажется, что оба работают (не уверен, что это правильный подход), но они не являются обобщенными c, которые я могу использовать повторно для другого сценария использования i иметь

1 Ответ

0 голосов
/ 26 февраля 2020

В этом примере я использовал https://github.com/bokeh/bokeh/blob/master/bokeh/sampledata/_data/auto-mpg.csv

Спарк по умолчанию ставит ранг впереди, поэтому имена столбцов «инвертированы» по сравнению с тем, что вы указали, но это делается в всего несколько шагов. Ключ в том, что exprs должен быть создан динамически, и что agg требует, чтобы это было разделено на голову и хвост (вот почему есть .agg(exprs(0), exprs.slice(1, exprs.length) ниже)

scala> df2.columns

res39: Array[String] = Array(mpg, cyl, displ, hp, weight, accel, yr, origin, name, Rank)

// note here, you would use columns.slice with the indices for
// the columns you need, i.e. (1, 7)
val exprs = for (col <- df2.columns.slice(0, 8)) yield expr(s"first(${col}) as ${col}")

exprs: Array[org.apache.spark.sql.Column] = Array(first(mpg, false) AS `mpg`, first(cyl, false) AS `cyl`, first(displ, false) AS `displ`, first(hp, false) AS `hp`, first(weight, false) AS `weight`, first(accel, false) AS `accel`, first(yr, false) AS `yr`, first(origin, false) AS `origin`)

scala> val resultDF = df2.groupBy("name").pivot("Rank").agg(exprs(0), exprs.slice(1, exprs.length):_*)

scala> resultDF.columns
res40: Array[String] = Array(name, 1_mpg, 1_cyl, 1_displ, 1_hp, 1_weight, 1_accel, 1_yr, 1_origin, 2_mpg, 2_cyl, 2_displ, 2_hp, 2_weight, 2_accel, 2_yr, 2_origin, 3_mpg, 3_cyl, 3_displ, 3_hp, 3_weight, 3_accel, 3_yr, 3_origin, 4_mpg, 4_cyl, 4_displ, 4_hp, 4_weight, 4_accel, 4_yr, 4_origin, 5_mpg, 5_cyl, 5_displ, 5_hp, 5_weight, 5_accel, 5_yr, 5_origin, 6_mpg, 6_cyl, 6_displ, 6_hp, 6_weight, 6_accel, 6_yr, 6_origin, 7_mpg, 7_cyl, 7_displ, 7_hp, 7_weight, 7_accel, 7_yr, 7_origin, 8_mpg, 8_cyl, 8_displ, 8_hp, 8_weight, 8_accel, 8_yr, 8_origin, 9_mpg, 9_cyl, 9_displ, 9_hp, 9_weight, 9_accel, 9_yr, 9_origin, 10_mpg, 10_cyl, 10_displ, 10_hp, 10_weight, 10_accel, 10_yr, 10_origin)
...