обновление столбца путем сравнения нескольких столбцов в фрейме данных pyspark - PullRequest
0 голосов
/ 19 мая 2018

У меня есть data frame in pyspark, как показано ниже.

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test1|  Y|  1|null|
|test2|  N|  2|  UK|
| null|  Y|  1|  UK|
|test1|  N|  2|null|
|test1|  N|  3|null|
|test3|  N|  4| AUS|
|test4|  Y|  5|null|
+-----+---+---+----+

Я хочу обновить значение, когда любой заданный tests или cnty имеет значение val Y, тогда все значения valэтого конкретного tests или cnty следует обновить до Y.если нет, то какие значения они имеют.

Я сделал, как показано ниже

from pyspark.sql import Window
import pyspark.sql.functions as f


df1 = df.select('tests', f.max('val').over(Window.partitionBy('tests')).alias('val'), 'asd', 'cnty')

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test1|  Y|  1|null|
|test1|  Y|  2|null|
|test1|  Y|  3|null|
|test2|  N|  2|  UK|
|test3|  N|  4| AUS|
|test4|  Y|  5|null|
| null|  Y|  1|  UK|
+-----+---+---+----+

Вышеприведенное не дает мне желаемого результата.Как вы можете видеть для test2 у меня есть cnty как UK, а val равно N, и у меня есть другая запись, где cnty равно UK и val для этой записи YТогда согласно моему требованию val для обеих этих записей должно быть Y.Но это не так в result.

Ответы [ 2 ]

0 голосов
/ 19 мая 2018

Вы проверяли только столбец test, но забыли проверить столбец cnty.Для этого вам понадобится * другой WindowSpec для столбца cnty и объедините оба windowSpecs, используя встроенную функцию when, чтобы получить желаемый результат

from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')

from pyspark.sql import functions as f
df = df.select(f.col('tests'), f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val'))).alias('val'), f.col('asd'), f.col('cnty'))
df.show(truncate=False)

, который должен дать вам

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test4|Y  |5  |null|
|test3|N  |4  |AUS |
|test1|Y  |1  |null|
|test1|Y  |2  |null|
|test1|Y  |3  |null|
|test2|Y  |2  |UK  |
|null |Y  |1  |UK  |
+-----+---+---+----+

Я надеюсьэто объясняет, почему вы не получаете желаемый результат.

Обновление

Приведенное выше решение требует одновременного запуска обеих функций window, что может привести к некоторымпроблемы с памятью.Вы можете запустить одну функцию window для проверки столбцов tests и cnty по отдельности, так как

from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')

from pyspark.sql import functions as f
df = df.withColumn('val', f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.col('val')))\
    .withColumn('val', f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val')))

. Это приведет к тому же результату.

0 голосов
/ 19 мая 2018

Вы можете попробовать следующий подход.Соединение данных слева с тем же самым кадром данных, отфильтрованным со значениями 'Y' справа: если найдено, примените Y, в противном случае выберите существующее значение.

df.alias('a').join(
    df.filter(col('val')='Y').alias('b'),
    on=(col('a.tests') == col('b.tests')) | (col('a.cnty') == col('b.cnty')),
    how='left'
  )
  .withColumn('final_val',when(col('b.val').isNull(), col('a.val')).otherwise(col('b.val')))
  .select('a.tests','a.asd','a.cnty','final_val')

Единственная проблема, могут быть дубликаты,но лучше проверить данные и в конце при необходимости выполнить дедупликацию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...