С PySpark, как мне заполнить значения в столбце на основе groupby / window / partition и выполнить UDF? - PullRequest
0 голосов
/ 07 июня 2019

Я пытаюсь заполнить пропущенные значения в столбце. Столбец профиля в 1-й строке или в любой из следующих строк (которые расположены в зависимости от даты на основе даты) в группе / разделе будет иметь значение, которое необходимо заполнить в следующих ячейках в столбце профиля.

Я пытался запустить его с помощью оконной функции, но не смог применить UDF к оконной функции.

valuesA = [('1',"", "20190108"),('1',"", "20190107"),('1',"abcd", "20190106"),('1',"", "20190105"),('1',"", "20190104"),('2',"wxyz", "20190103"),('2',"", "20190102"),('2',"", "20190101")]
TableA = spark.createDataFrame(valuesA,['vid','profile', 'date'])

valuesB = [('1',"null", "20190108"),('1',"null", "20190107"),('1',"abcd", "20190106"),('1',"abcd", "20190105"),('1',"abcd", "20190104"),('2',"wxyz", "20190103"),('2', "wxyz", "20190102"),('2', "wxyz", "20190101")]
TableB = spark.createDataFrame(valuesB,['vid','profile', 'date'])

TableA.show()
TableB.show()
Table A: This is what I have. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|       |20190108|
|  1|       |20190107|
|  1|   abcd|20190106|
|  1|       |20190105|
|  1|       |20190104|
|  2|   wxyz|20190103|
|  2|       |20190102|
|  2|       |20190101|
+---+-------+--------+

Table B: What I am expecting. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|   null|20190108|
|  1|   null|20190107|
|  1|   abcd|20190106|
|  1|   abcd|20190105|
|  1|   abcd|20190104|
|  2|   wxyz|20190103|
|  2|   wxyz|20190102|
|  2|   wxyz|20190101|
+---+-------+--------+

1 Ответ

1 голос
/ 07 июня 2019

Вы можете использовать last оконную функцию.Примечание: сначала withColumn - заменить все пустые строки на нули - last функция по умолчанию пропускает нули, что в данном случае и является тем, что нам нужно.

from pyspark.sql.window import Window
from pyspark.sql.functions import *
TableB = TableA.withColumn('profile', when(length('profile') == 0, lit(None)).otherwise(col('profile')))\
    .withColumn("profile", last(col('profile'), True).over(Window.partitionBy('vid').orderBy(col('date').desc())))

TableB.show()

Вывод:

+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|   null|20190108|
|  1|   null|20190107|
|  1|   abcd|20190106|
|  1|   abcd|20190105|
|  1|   abcd|20190104|
|  2|   wxyz|20190103|
|  2|   wxyz|20190102|
|  2|   wxyz|20190101|
+---+-------+--------+
...