Фильтрация pyspark DataFrame, где строки находятся в пределах диапазона другого DataFrame - PullRequest
0 голосов
/ 26 декабря 2018

Я хочу получить все строки из одного DataFrame (df1) так, чтобы его id находилось в пределах + - 10 от любого значения в столбце id другого DataFrame (df2).

Пример:

df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#|  cat| 30|
#+-----+---+ 

df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff|  3|
#| etc|100|
#+----+---+

Желаемый результат:

+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+

Это потому, что "apple" был в пределах 10 от "jeff".

Как видите, строка хороша, если id в df1 соответствует критериям для любого id в df2.Два DataFrames также не обязательно имеют одинаковую длину.

Мне уже ясно, как сделать что-то вроде isin или antijoin для точных совпадений, но я не совсем уверен в этом более мягком случае.

Редактировать: у меня возникла новая мысль, что если нет заранее созданного или чистого способа сделать это, возможно, существует поддержка сложной фильтрации на основе определенных функций, если они распараллеливаются.Я начну с этого гугл следа и буду обновлять, если найду путь в этом направлении.

Редактировать: До сих пор я наткнулся на udf функций, но мне пока не удалось заставить его работать,Я думаю, мне нужно сделать так, чтобы он принимал столбец, а не отдельные числа.Вот то, что у меня есть до сих пор ..

columns = ['word', 'id']
vals = [
     ("apple",10),
     ("cat",30)
]

df1 = sqlContext.createDataFrame(vals, columns)

vals = [
     ("some",50),
     ("jeff",3),
     ("etc",100)
]

df2 = sqlContext.createDataFrame(vals, columns)

def inRange(id1,id2,delta):
    id1 = int(id1)
    id2 = int(id2)
    return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())

df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()

В настоящее время выдает ошибку

TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

1 Ответ

0 голосов
/ 26 декабря 2018

Вы не можете передать DataFrame на udf.Естественный способ сделать это - использовать join:

import pyspark.sql.functions as f

df1.alias('l').join(
    df2.alias('r'), 
    on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+

Я использовал alias, чтобы избежать двусмысленности при указании имен столбцов DataFrame.Это присоединяет df1 к df2, где абсолютное значение разности между df1.id и df2.id меньше или равно 10, и выбирает только столбцы из df1.

...