Как создать столбец с суммой значений списка в фрейме данных pyspark - PullRequest
1 голос
/ 16 марта 2020

Я довольно новичок в pyspark и имею дело со сложным фреймом данных. Я застрял, пытаясь получить N строк из списка в моем файле df.column после некоторой фильтрации.

У меня есть следующий файл df.struct:

root
 |-- struct1: struct (nullable = true)
 |    |-- array1: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- struct2 : struct (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- struct3 : struct (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- property: string (nullable = true)

Чего я хочу добиться, так это получить сумму всех значений struct2.values ​​, когда свойство имеет значение Good. Поскольку я могу иметь несколько (N) значений для массива 1.

Прямо сейчас, я получил небольшое предложение, чтобы получить первое свойство. Но я не могу успешно передать его в udf, чтобы перебрать все возможные строки: df.withColumn("Sum", (col('struct1.array1')[0])['property'])

Вот некоторые шаги, которые я имею в виду:

  • Фильтр каждый элемент в списке, когда свойство = Good

  • Возвращает лямбда-значение в формате udf с суммой struct3.value

Требуемый вывод должен быть что-то вроде:

None
+---------------------------------------------------------------------------------------------------------+
|Struct1                                                                                            |Sum|
+---------------------------------------------------------------------------------------------------------+
|[[[[2020-01-01, 10], [2020-02-02, 15], Good], [[2020-01-01, 20], [2020-02-02, 25], Good]]]         |20|
+---------------------------------------------------------------------------------------------------------+

Любая помощь будет признателен

1 Ответ

1 голос
/ 17 марта 2020

В этом случае вам необязательно нужен UDF. При использовании Spark> = 2.4.0 вы можете добиться того же самого, просто используя встроенные старшие функции, как показано ниже:

from pyspark.sql.functions import expr  

df.withColumn("good_elements", expr("""transform( \
                                         filter(struct1.array1, e -> e.property == 'Good'), 
                                         e -> cast(e.struct2.value as int)
                                    )""")) \
  .withColumn("sum", expr("aggregate(good_elements, 0, (sum, e) -> sum + e)"))
  • filter(struct1.array1, e -> e.property == 'Good'): Сначала мы фильтруем элементы, которые имеют property == 'Good'

  • transform(..., e -> cast(e.struct2.value as int): Затем мы конвертируем каждый элемент в целое число и сохраняем их в новый столбец с именем good_elements

  • aggregate(good_elements, 0, (sum, e) -> sum + e): наконец, мы создаем столбец sum путем вычисления суммы good_elements

...