Spark (Scala) фильтрует массив конструкций без взрыва - PullRequest
0 голосов
/ 02 марта 2019

У меня есть фрейм данных с ключом и столбец с массивом структур в столбце фрейма данных.Каждая строка содержит столбец a выглядит примерно так:

[
    {"id" : 1, "someProperty" : "xxx", "someOtherProperty" : "1", "propertyToFilterOn" : 1},
    {"id" : 2, "someProperty" : "yyy", "someOtherProperty" : "223", "propertyToFilterOn" : 0},
    {"id" : 3, "someProperty" : "zzz", "someOtherProperty" : "345", "propertyToFilterOn" : 1}
]

Теперь я хотел бы сделать две вещи:

  1. Фильтр для "propertyToFilterOn" = 1
  2. Применить некоторую логику кдругие свойства - например, сцепление

, так что результат будет:

[
{"id" : 1, "newProperty" : "xxx_1"},
{"id" : 3, "newProperty" : "zzz_345"}
]

Я знаю, как сделать это с помощью Explode, но для Explode также требуется groupBy на ключе при его соединении.Но так как это потоковый Dataframe, мне также пришлось бы поставить на него водяной знак, которого я стараюсь избегать.

Есть ли другой способ добиться этого без использования взрыва?Я уверен, что есть немного магии Скалы, которая может достичь этого!

Спасибо!

1 Ответ

0 голосов
/ 02 марта 2019

С искрой 2.4 появилось много функций более высокого порядка для массивов.(см. https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)

val dataframe = Seq(
("a", 1, "xxx", "1", 1),
("a", 2, "yyy", "223", 0),
("a", 3, "zzz", "345", 1)
).toDF( "grouping_key", "id" , "someProperty" , "someOtherProperty", "propertyToFilterOn" )
.groupBy("grouping_key")
.agg(collect_list(struct("id" , "someProperty" , "someOtherProperty", "propertyToFilterOn")).as("your_array"))

dataframe.select("your_array").show(false)

+----------------------------------------------------+
|your_array                                          |
+----------------------------------------------------+
|[[1, xxx, 1, 1], [2, yyy, 223, 0], [3, zzz, 345, 1]]|
+----------------------------------------------------+

Вы можете фильтровать элементы в массиве, используя функцию фильтра массива высшего порядка, например:

val filteredDataframe = dataframe.select(expr("filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)").as("filtered_arrays"))

filteredDataframe.show(false)

+----------------------------------+
|filtered_arrays                   |
+----------------------------------+
|[[1, xxx, 1, 1], [3, zzz, 345, 1]]|
+----------------------------------+

для «другой логики» вашегоговоря о том, что вы должны иметь возможность использовать функцию массива преобразования более высокого порядка следующим образом:

val tranformedDataframe = filteredDataframe
.select(expr("transform(filtered_arrays, your_struct -> struct(concat(your_struct.someProperty, '_', your_struct.someOtherProperty))"))

, но есть проблемы с возвратом структур из функции преобразования, как описано в этом посте:

http://mail-archives.apache.org/mod_mbox/spark-user/201811.mbox/%3CCALZs8eBgWqntiPGU8N=ENW2Qvu8XJMhnViKy-225ktW+_c0czA@mail.gmail.com%3E

*1017* * *лучше всего использовать API набора данных для преобразования следующим образом:
case class YourStruct(id:String, someProperty: String, someOtherProperty: String)
case class YourArray(filtered_arrays: Seq[YourStruct])

case class YourNewStruct(id:String, newProperty: String)

val transformedDataset = filteredDataframe.as[YourArray].map(_.filtered_arrays.map(ys => YourNewStruct(ys.id, ys.someProperty + "_" + ys.someOtherProperty)))

val transformedDataset.show(false)

+--------------------------+
|value                     |
+--------------------------+
|[[1, xxx_1], [3, zzz_345]]|
+--------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...