Использование оконных функций PySpark с условиями для добавления строк - PullRequest
0 голосов
/ 04 февраля 2020

Мне нужно иметь возможность добавлять новые строки в PySpark. Значения df будут основаны на содержимом других строк с общим идентификатором. В конечном итоге появятся миллионы идентификаторов с множеством строк для каждого идентификатора. Я попробовал метод ниже, который работает, но кажется слишком сложным.

Я начинаю с df в формате ниже (но на самом деле есть больше столбцов):

+-------+----------+-------+
|   id  | variable | value |
+-------+----------+-------+
|     1 | varA     |    30 |
|     1 | varB     |     1 |
|     1 | varC     |    -9 |
+-------+----------+-------+

В настоящее время я поворачиваю этот df, чтобы получить его в следующем формате:

+-----+------+------+------+
|  id | varA | varB | varC |
+-----+------+------+------+
|   1 |   30 |    1 |   -9 |
+-----+------+------+------+

На этом df я могу затем использовать стандарт withColumn и при функциональности добавлять новые столбцы на основе значений в других столбцах. Например:

df = df.withColumn("varD", when((col("varA") > 16) & (col("varC") != -9)), 2).otherwise(1)

Что приводит к:

+-----+------+------+------+------+
|  id | varA | varB | varC | varD |
+-----+------+------+------+------+
|   1 |   30 |    1 |   -9 |    1 |
+-----+------+------+------+------+

Затем я могу повернуть этот df обратно в исходный формат, ведущий к этому:

+-------+----------+-------+
|   id  | variable | value |
+-------+----------+-------+
|     1 | varA     |    30 |
|     1 | varB     |     1 |
|     1 | varC     |    -9 |
|     1 | varD     |     1 |
+-------+----------+-------+

This работает, но кажется, что с миллионами строк это может привести к дорогостоящим и ненужным операциям. Такое ощущение, что это должно быть выполнимо без необходимости поворачивать и разворачивать данные. Нужно ли это делать?

Я читал о функциях Window, и звучит так, как будто они могут быть другим способом достижения того же результата, но, честно говоря, я изо всех сил пытаюсь начать с ними. Я вижу, как их можно использовать для генерации значения, скажем, суммы, для каждого идентификатора или для поиска максимального значения, но я не нашел способа даже начать применять сложные условия, которые приводят к новой строке.

Любая помощь с этой проблемой будет принята с благодарностью.

1 Ответ

1 голос
/ 04 февраля 2020

Вы можете использовать pandas_udf для добавления / удаления строк / столбцов в сгруппированных данных и реализовать ваши логики обработки c в pandas udf.

import pyspark.sql.functions as F

row_schema = StructType(
    [StructField("id", IntegerType(), True),
     StructField("variable", StringType(), True),
     StructField("value", IntegerType(), True)]
)

@F.pandas_udf(row_schema, F.PandasUDFType.GROUPED_MAP)
def addRow(pdf):
    val = 1
    if  (len(pdf.loc[(pdf['variable'] == 'varA') & (pdf['value'] > 16)]) > 0 ) & \
        (len(pdf.loc[(pdf['variable'] == 'varC') & (pdf['value'] != -9)]) > 0):
        val = 2
    return pdf.append(pd.Series([1, 'varD', val], index=['id', 'variable', 'value']), ignore_index=True)

df = spark.createDataFrame([[1, 'varA', 30],
                            [1, 'varB', 1],
                            [1, 'varC', -9]
                            ], schema=['id', 'variable', 'value'])

df.groupBy("id").apply(addRow).show()

, который переходит

+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|    varA|   30|
|  1|    varB|    1|
|  1|    varC|   -9|
|  1|    varD|    1|
+---+--------+-----+
...