обновить столбец фрейма данных pyspark на основе другого столбца - PullRequest
0 голосов
/ 17 мая 2018

Ниже приведен фрейм данных в pyspark. Я хочу обновить столбец val в data frame на основе значений в столбце tests.

df.show()
+---------+----+---+
|    tests| val|asd|
+---------+----+---+
|    test1|   Y|  1|
|    test2|   N|  2|
|    test2|   Y|  1|
|    test1|   N|  2|
|    test1|   N|  3|
|    test3|   N|  4|
|    test4|   Y|  5|
+---------+----+---+

Я хочу обновить значение, когда для любого данного test есть val Y, тогда все val's этого конкретного теста должны быть обновлены до Y. если нет, то какие ценности они имеют.

В основном я хочу, чтобы data frame был таким, как показано ниже.

result_df.show()

+---------+----+---+
|    tests| val|asd|
+---------+----+---+
|    test1|   Y|  1|
|    test2|   Y|  2|
|    test2|   Y|  1|
|    test1|   Y|  2|
|    test1|   Y|  3|
|    test3|   N|  4|
|    test4|   Y|  5|
+---------+----+---+

Что я должен сделать, чтобы достичь этого.

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Используйте max оконную функцию и selectExpr:

df.selectExpr(
    'tests', 'max(val) over (partition by tests) as val', 'asd'
).show()

+-----+---+---+
|tests|val|asd|
+-----+---+---+
|test4|  Y|  5|
|test3|  N|  4|
|test1|  Y|  1|
|test1|  Y|  2|
|test1|  Y|  3|
|test2|  Y|  2|
|test2|  Y|  1|
+-----+---+---+
0 голосов
/ 17 мая 2018

Вот решение.Сначала мы выясняем для каждого теста, имеет ли он значение Y.

import pyspark.sql.functions as sf
by_test = df.groupBy('tests').agg(sf.sum((sf.col('val') == 'Y').cast('int')).alias('HasY'))
by_test.show()
+-----+----+
|tests|HasY|
+-----+----+
|test4|   1|
|test3|   0|
|test1|   1|
|test2|   1|
+-----+----+

Присоединиться к исходному фрейму данных

df = df.join(by_test, on='tests')
df.show()
+-----+---+---+----+
|tests|val|asd|HasY|
+-----+---+---+----+
|test4|  Y|  5|   1|
|test3|  N|  4|   0|
|test1|  Y|  1|   1|
|test1|  N|  2|   1|
|test1|  N|  3|   1|
|test2|  N|  2|   1|
|test2|  Y|  1|   1|
+-----+---+---+----+

Создать новый столбец с тем же именем, используя когда / в противном случае

df = df.withColumn('val', sf.when(sf.col('HasY') > 0, 'Y').otherwise(sf.col('val')))
df = df.drop('HasY')
df.show()
+-----+---+---+
|tests|val|asd|
+-----+---+---+
|test4|  Y|  5|
|test3|  N|  4|
|test1|  Y|  1|
|test1|  Y|  2|
|test1|  Y|  3|
|test2|  Y|  2|
|test2|  Y|  1|
+-----+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...