Как перенести набор данных Apache Spark в Java - PullRequest
0 голосов
/ 06 июня 2019

У меня есть Apache Spark Dataset<Row>, который я хочу транспонировать. Из некоторых тем здесь для меня ясно, что это можно сделать с помощью группировки-объединения-агрегации. Но я не понимаю, как мне нужно. У меня есть следующая таблица ввода:

+-------+------+------+------+------+
| rho_0 | rho_1| rho_2|rho_3 | names|
+-------+------+------+------+------+
|  1    | 0.89 | 0.66 | 0.074|  rho |
|  1    | 0.89 | 0.66 | 0.074|absRho|
|  0    |  1   | 2    |  3   | lag  |
+-------+------+------+------+------+

Что мне нужно, это

+-------+------+------+
| rho   |absRho| lag  |
+-------+------+------+
|  1    | 1    |  0   |
|  0.89 | 0.89 |  1   |
|  0.66 | 0.66 |  2   |
|  0.074| 0.074|  3   |
+-------+------+------+

Я пробовал что-то вроде

Dataset<Row> transposed = coll.groupBy().pivot("names").min("rho_0");

но это не работает. Вызов groupBy с последовательностью столбцов из ввода также не работает. Я нашел обходной путь, который мне не нравится:

Dataset<Row> transposed = coll.groupBy().pivot("names").min("rho_0")
for (int i = 1; i < nlags; i++) {
    transposed = transposed.union(coll.groupBy().pivot("names").min("rho_" + i));
}

но он действительно медленный и не предназначен для такой реализации. У вас есть какие-нибудь советы? Заранее спасибо!

1 Ответ

1 голос
/ 06 июня 2019

К сожалению, в искре нет встроенной функции для этого.Существует решение, использующее pivot, но вам нужно «взорвать» фрейм данных раньше.Это должно быть намного быстрее, чем ваше решение, основанное на союзах.

В scala это будет выглядеть следующим образом.Я добавил версию Java прямо ниже.

// scala
val cols = df.columns
  .filter(_ != "names")
  .map(n => struct(lit(n) as "c", col(n) as "v"))
val exploded_df = df.select(col("names"), explode(array(cols : _*)))
// java
Column[] cols = Arrays
    .stream(df.columns())
    .filter(x -> ! x.equals("names"))
    .map(n -> struct(lit(n).alias("c"), col(n).alias("v")))
    .toArray(Column[]::new);
Dataset<Row> exploded_df = df.select(col("names"), explode(array(cols)));
exploded_df.show();
+------+-------------+
| names|          col|
+------+-------------+
|   rho|    [rho_0,1]|
|   rho| [rho_1,0.89]|
|   rho| [rho_2,0.66]|
|   rho|[rho_3,0.074]|
|absRho|    [rho_0,1]|
|absRho| [rho_1,0.89]|
|absRho| [rho_2,0.66]|
|absRho|[rho_3,0.074]|
|   lag|    [rho_0,0]|
|   lag|    [rho_1,1]|
|   lag|    [rho_2,2]|
|   lag|    [rho_3,3]|
+------+-------------+

По сути, я построил столбец массива, который содержит структуры, состоящие из имени столбца и его значения.Затем я использовал функцию разнесения, чтобы сгладить этот массив.Оттуда мы можем использовать pivot как обычно; -)

// scala and java
exploded_df
  .groupBy(col("col.c"))
  .pivot("names")
  .agg(first(col("col.v")))
  .orderBy("c")
  .show();
+-----+------+---+-----+
|    c|absRho|lag|  rho|
+-----+------+---+-----+
|rho_0|     1|  0|    1|
|rho_1|  0.89|  1| 0.89|
|rho_2|  0.66|  2| 0.66|
|rho_3| 0.074|  3|0.074|
+-----+------+---+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...