Возможный подход, позволяющий избежать циклов, может заключаться в создании фрейма данных из кортежа, который вы используете в качестве входных:
t = [('A',3.5),('A',8),('B',3.5),('B',10)]
ref=spark.createDataFrame([(i[0],float(i[1])) for i in t],("col1_y","d_y"))
Затем мы можем присоединиться к входному фрейму данных (df
) по условию, а затем сгруппировать его по ключи и значения кортежа, которые будут повторяться, чтобы получить первое значение в каждой группе, а затем удалить дополнительные столбцы:
(df.join(ref,(df.col1==ref.col1_y)&(df.d>ref.d_y),how='inner').orderBy("col1","d")
.groupBy("col1_y","d_y").agg(F.first("col1").alias("col1"),F.first("d").alias("d"))
.drop("col1_y","d_y")).show()
+----+----+
|col1| d|
+----+----+
| A|10.0|
| A| 4.0|
| B| 4.0|
| B|13.0|
+----+----+
Обратите внимание: если порядок фрейма данных важен, вы можете назначить столбец индекса с помощью monotonically_increasing_id
и включить их в агрегирование, а затем упорядочить по столбцу индекса.
ИЗМЕНИТЬ другим способом вместо упорядочивания и получите first
напрямую с min
:
(df.join(ref,(df.col1==ref.col1_y)&(df.d>ref.d_y),how='inner')
.groupBy("col1_y","d_y").agg(F.min("col1").alias("col1"),F.min("d").alias("d"))
.drop("col1_y","d_y")).show()
+----+----+
|col1| d|
+----+----+
| B| 4.0|
| B|13.0|
| A| 4.0|
| A|10.0|
+----+----+