как выбрать первые n элементов строки на основе нескольких условий в pyspark - PullRequest
2 голосов
/ 20 июня 2020

Теперь у меня есть такие данные:

+----+----+
|col1|   d|
+----+----+
|   A|   4|
|   A|  10|
|   A|   3|
|   B|   3|
|   B|   6|
|   B|   4|
|   B| 5.5|
|   B|  13|
+----+----+

col1 - StringType, d - TimestampType, здесь я использую вместо этого DoubleType. Я хочу сгенерировать данные на основе кортежей условий. Учитывая кортеж [(A, 3.5), (A, 8), (B, 3.5), (B, 10)], я хочу получить результат вроде

+----+---+
|col1|  d|
+----+---+
|   A|  4|
|   A| 10|
|   B|  4|
|   B| 13|
+----+---+

Это для каждого элемента в кортеж, мы выбираем из фрейма данных pyspark первую 1 строку, которая d больше номера кортежа, а col1 равна строке кортежа. Я уже написал:

df_res=spark_empty_dataframe    
for (x,y) in tuples:
         dft=df.filter(df.col1==x).filter(df.d>y).limit(1)
         df_res=df_res.union(dft)

Но я думаю, что это может иметь проблемы с эффективностью, я не знаю, был ли я прав.

1 Ответ

2 голосов
/ 21 июня 2020

Возможный подход, позволяющий избежать циклов, может заключаться в создании фрейма данных из кортежа, который вы используете в качестве входных:

t = [('A',3.5),('A',8),('B',3.5),('B',10)]
ref=spark.createDataFrame([(i[0],float(i[1])) for i in t],("col1_y","d_y"))

Затем мы можем присоединиться к входному фрейму данных (df) по условию, а затем сгруппировать его по ключи и значения кортежа, которые будут повторяться, чтобы получить первое значение в каждой группе, а затем удалить дополнительные столбцы:

(df.join(ref,(df.col1==ref.col1_y)&(df.d>ref.d_y),how='inner').orderBy("col1","d")

.groupBy("col1_y","d_y").agg(F.first("col1").alias("col1"),F.first("d").alias("d"))

.drop("col1_y","d_y")).show()
+----+----+
|col1|   d|
+----+----+
|   A|10.0|
|   A| 4.0|
|   B| 4.0|
|   B|13.0|
+----+----+

Обратите внимание: если порядок фрейма данных важен, вы можете назначить столбец индекса с помощью monotonically_increasing_id и включить их в агрегирование, а затем упорядочить по столбцу индекса.

ИЗМЕНИТЬ другим способом вместо упорядочивания и получите first напрямую с min:

(df.join(ref,(df.col1==ref.col1_y)&(df.d>ref.d_y),how='inner')

.groupBy("col1_y","d_y").agg(F.min("col1").alias("col1"),F.min("d").alias("d"))

.drop("col1_y","d_y")).show()

+----+----+
|col1|   d|
+----+----+
|   B| 4.0|
|   B|13.0|
|   A| 4.0|
|   A|10.0|
+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...