Как получить уникальные значения в каждом окне в фрейме данных pyspark - PullRequest
0 голосов
/ 03 апреля 2019

У меня есть следующий искровой фрейм данных:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('').getOrCreate()
df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])

, который возвращает меня:

+---+-------+------+
| nr|column2|quant |
+---+-------+------+
|  1|      a|     2|
|  2|      b|     2|
|  3|      c|     2|
|  4|      d|     2|
|  5|      b|     3|
|  6|      b|     3|
|  7|      c|     2|
+---+-------+------+

Я хотел бы получить строки, где для каждого 3 сгруппированных строки (из каждого окна, гдеразмер окна 3) столбец кванта имеет уникальные значения.как на следующем рисунке:

enter image description here

Здесь красный - это размер окна, и в каждом окне я сохраняю только зеленые строки, где квант уникален:

Результат, который я хотел бы получить, следующий:

+---+-------+------+
| nr|column2|values|
+---+-------+------+
|  1|      a|     2|
|  4|      d|     2|
|  5|      b|     3|
|  7|      c|     2|
+---+-------+------+

Я новичок в Spark, поэтому я был бы признателен за любую помощь.Спасибо

1 Ответ

2 голосов
/ 03 апреля 2019

Этот подход должен работать для вас, предполагая, что группировка 3 записей основана на столбце «nr».

Использование udf, который решает, должна ли быть выбрана запись, и lag, используется для получения данных предыдущих строк.

def tag_selected(index, current_quant, prev_quant1, prev_quant2):                                                                                                    
    if index % 3 == 1:  # first record in each group is always selected                                                                                              
        return True                                                                                                                                                  
    if index % 3 == 2 and current_quant != prev_quant1: # second record will be selected if prev quant is not same as current                                        
        return True                                                                                                                                                  
    if index % 3 == 0 and current_quant != prev_quant1 and current_quant != prev_quant2: # third record will be selected if prev quant are not same as current       
        return True                                                                                                                                                  
    return False                                                                                                                                                     

tag_selected_udf = udf(tag_selected, BooleanType())                                                                                                                  

df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])

window = Window.orderBy("nr")

df = df.withColumn("prev_quant1", lag(col("quant"),1, None).over(window))\
       .withColumn("prev_quant2", lag(col("quant"),2, None).over(window)) \
       .withColumn("selected", 
                   tag_selected_udf(col('nr'),col('quant'),col('prev_quant1'),col('prev_quant2')))\
       .filter(col('selected') == True).drop("prev_quant1","prev_quant2","selected")
df.show()

что дает

+---+-------+-----+
| nr|column2|quant|
+---+-------+-----+
|  1|      a|    2|
|  4|      d|    2|
|  5|      b|    3|
|  7|      c|    2|
+---+-------+-----+
...