Spark DataFrame: игнорировать столбцы с пустыми идентификаторами в groupBy - PullRequest
0 голосов
/ 11 октября 2019

У меня есть фрейм данных, например, с такой структурой:

ID | Date | P1_ID | P2_ID | P3_ID | P1_A | P1_B | P2_A | ...
============================================================
1  | 123  | 1     |       |       | A1   | B1   |      | ... <- only P1_x columns filled
1  | 123  | 2     |       |       | A2   | B2   |      | ... <- only P1_x filled
1  | 123  | 3     |       |       | A3   | B3   |      | ... <- only P1_x filled
1  | 123  |       | 1     |       |      |      | A4   | ... <- only P2_x filled
1  | 123  |       | 2     |       |      |      | A5   | ... <- only P2_x filled
1  | 123  |       |       | 1     |      |      |      | ... <- only P3_x filled

Мне нужно объединить строки, которые имеют одинаковые значения ID, Date и Px_ID, но без учета пустых значений в Px_ID при сравненииключевые столбцы.

В конце концов мне нужен такой фрейм данных:

ID | Date | P1_ID | P2_ID | P3_ID | P1_A | P1_B | P2_A | ...
============================================================
1  | 123  | 1     | 1     | 1     | A1   | B1   | A4   | ...
1  | 123  | 2     | 2     |       | A2   | B2   | A5   | ...
1  | 123  | 3     |       |       | A3   | B3   |      | ...

Возможно ли это и как? Спасибо!

1 Ответ

1 голос
/ 11 октября 2019

Я нашел решение этой проблемы: поскольку нерелевантные столбцы x_ID пусты, одним из возможных способов является создание нового столбца combined_ID, который содержит объединение всех значений столбца x_ID (он будет содержать только одно значение,поскольку только один x_ID не является пустым в каждой строке):

var xIdArray = Seq[Column]("P1_ID", "P2_ID", "P3_ID")
myDF = myDF.withColumn("combined_ID", concat(xIdArray : _*))

Это меняет DF на следующую структуру:

ID | Date | P1_ID | P2_ID | P3_ID | P1_A | P1_B | P2_A | ... | combined_ID 
===========================================================================
1  | 123  | 1     |       |       | A1   | B1   |      | ... | 1
1  | 123  | 2     |       |       | A2   | B2   |      | ... | 2
1  | 123  | 3     |       |       | A3   | B3   |      | ... | 3
1  | 123  |       | 1     |       |      |      | A4   | ... | 1
1  | 123  |       | 2     |       |      |      | A5   | ... | 2
1  | 123  |       |       | 1     |      |      |      | ... | 1

Теперь я могу просто сгруппировать свой DF по ID, датеи комбинированный_идентификатор и агрегирование всех соответствующих столбцов, например, с помощью функции max для получения значений непустых ячеек:

var groupByColumns : Seq[String] = Seq("ID", "Date", "x_ID")
var aggColumns : Seq[String] = Seq("P1_ID", "P2_ID", "P3_ID", "P1_A", "P1_B", "P2_A", ...)

myDF = myDF.groupBy(groupByColumns.head, groupByColumns.tail : _*).agg(aggColumns.head, aggColumns.tail : _*)

Результат:

ID | Date | combined_ID | P1_ID | P2_ID | P3_ID | P1_A | P1_B | P2_A | ... 
===========================================================================
1  | 123  | 1           | 1     | 1     | 1     | A1   | B1   | A4   | ...
1  | 123  | 2           | 2     | 2     |       | A2   | B2   | A5   | ...
1  | 123  | 3           | 3     |       |       | A3   | B3   |      | ...
...