обновить несколько столбцов на основе двух столбцов во фреймах данных pyspark - PullRequest
0 голосов
/ 30 мая 2018

У меня есть кадр данных, как показано ниже в pyspark.

+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
|     serial_number  |     rest_id  |     value  |     body  |     legs  |     face  |     idle  |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
| sn11               | rs1          | N          | Y         | N         | N         | acde      |
| sn1                | rs1          | N          | Y         | N         | N         | den       |
| sn1                | null         | Y          | N         | Y         | N         | can       |
| sn2                | rs2          | Y          | Y         | N         | N         | aeg       |
| null               | rs2          | N          | Y         | N         | Y         | ueg       |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+

Теперь я хочу update некоторые столбцы при проверке некоторых значений столбцов.

Я хочуобновите value, когда любой данный serial_number или rest_id имеет значение Y, тогда все values этого конкретного serial_number или rest_id должны быть обновлены до Y. Если нет, то какие значения они имеют.

Я сделал, как показано ниже.

df.alias('a').join(df.filter(col('value')='Y').alias('b'),on=(col('a.serial_number') == col('b.serial_number')) | (col('a.rest_id') == col('b.rest_id')), how='left').withColumn('final_value',when(col('b.value').isNull(), col('a.value')).otherwise(col('b.value'))).select('a.serial_number','a.rest_id','a.body', 'a.legs', 'a.face', 'a.idle', 'final_val')

Я получил желаемый результат.

Теперь я хочу повторить то же самое для столбцов body, legs и face.

Я могу сделать то же, что и выше для всех столбцов individually, я имею в виду 3 операторы объединения.Но я хочу обновить все столбцы 4 в одном операторе.

Как я могу это сделать?

Expected result

+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
|     serial_number  |     rest_id  |     value  |     body  |     legs  |     face  |     idle  |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
| sn11               | rs1          | N          | Y         | N         | N         | acde      |
| sn1                | rs1          | Y          | Y         | Y         | N         | den       |
| sn1                | null         | Y          | Y         | Y         | N         | can       |
| sn2                | rs2          | Y          | Y         | N         | Y         | aeg       |
| null               | rs2          | Y          | Y         | N         | Y         | ueg       |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+

1 Ответ

0 голосов
/ 30 мая 2018

Вы должны использовать функции window для обоих столбцов serial_number и rest_id, чтобы проверить, присутствует ли Y в столбцах в этих группах.(комментарии в качестве пояснения приведены ниже)

#column names for looping for the updates
columns = ["value","body","legs","face"]
import sys
from pyspark.sql import window as w
#window for serial number grouping
windowSpec1 = w.Window.partitionBy('serial_number').rowsBetween(-sys.maxint, sys.maxint)
#window for rest id grouping
windowSpec2 = w.Window.partitionBy('rest_id').rowsBetween(-sys.maxint, sys.maxint)

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function for checking if Y is in the collected list of windows defined above for the columns in the list defined for looping
def containsUdf(x):
    return "Y" in x

containsUdfCall = f.udf(containsUdf, t.BooleanType())

#looping the columns for checking the condition defined in udf function above by collecting the N and Y in each columns within windows defined
for column in columns:
    df = df.withColumn(column, f.when(containsUdfCall(f.collect_list(column).over(windowSpec1)) | containsUdfCall(f.collect_list(column).over(windowSpec2)), "Y").otherwise(df[column]))

df.show(truncate=False)

, что должно дать вам

+-------------+-------+-----+----+----+----+----+
|serial_number|rest_id|value|body|legs|face|idle|
+-------------+-------+-----+----+----+----+----+
|sn2          |rs2    |Y    |Y   |N   |Y   |aeg |
|null         |rs2    |Y    |Y   |N   |Y   |ueg |
|sn11         |rs1    |N    |Y   |N   |N   |acde|
|sn1          |rs1    |Y    |Y   |Y   |N   |den |
|sn1          |null   |Y    |Y   |Y   |N   |can |
+-------------+-------+-----+----+----+----+----+

Я бы порекомендовал использовать оконную функцию отдельно в двух циклах, поскольку это может дать вамисключения памяти для больших данных, так как обе оконные функции используются одновременно для каждой строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...