Pyspark - фильтрация фрейма данных на основе значений строк другого фрейма данных - PullRequest
1 голос
/ 28 мая 2020

У меня есть основной фрейм данных и дополнительный фрейм данных, которые я хочу go по строкам, фильтровать основной фрейм данных на основе значений в каждой строке, запускать функцию на отфильтрованном главном фрейме данных и сохранять вывод.

Вывод может быть сохранен в отдельном фрейме данных или в новом столбце вторичного фрейма данных.

# Master DF
df = pd.DataFrame({"Name": ["Mike", "Bob", "Steve", "Jim", "Dan"], "Age": [22, 44, 66, 22, 66], "Job": ["Doc", "Cashier", "Fireman", "Doc", "Fireman"]})

#Secondary DF
df1 = pd.DataFrame({"Age": [22, 66], "Job": ["Doc", "Fireman"]})

df = spark.createDataFrame(df)
+-----+---+-------+
| Name|Age|    Job|
+-----+---+-------+
| Mike| 22|    Doc|
|  Bob| 44|Cashier|
|Steve| 66|Fireman|
|  Jim| 22|    Doc|
|  Dan| 66|Fireman|
+-----+---+-------+

df1 = spark.createDataFrame(df1)
+---+-------+
|Age|    Job|
+---+-------+
| 22|    Doc|
| 66|Fireman|
+---+-------+
​
# Filter by values in first row of secondary DF
df_filt = df.filter(
    (F.col("Age") == 22) &                                
    (F.col('Job') == 'Doc')                          
)

# Run the filtered DF through my function
def my_func(df_filt):
    my_list = df_filt.select('Name').rdd.flatMap(lambda x: x).collect()
    return '-'.join(my_list)

# Output of function
my_func(df_filt)
'Mike-Jim'

# Filter by values in second row of secondary DF
df_filt = df.filter(
    (F.col("Age") == 66) &                                
    (F.col('Job') == 'Fireman')                          
)

# Output of function
my_func(df_filt)
'Steve-Dan'

# Desired output at the end of the iterations
new_df1 = pd.DataFrame({"Age": [22, 66], "Job": ["Doc", "Fireman"], "Returned_value": ['Mike-Jim', 'Steve-Dan']})

В принципе, я хочу взять свой Master DF и отфильтровать его в определенных способов, и запустите алгоритм на отфильтрованном наборе данных и получите результат для этой фильтрации, затем go перейдите к следующему набору фильтрации и сделайте то же самое.

Каков наилучший способ go о этот?

1 Ответ

1 голос
/ 28 мая 2020

Попробуйте это с join, groupBy, concat_ws/array_join и collect_list.

from pyspark.sql import functions as F

df.join(df1,['Age','Job'])\
  .groupBy("Age","Job").agg(F.concat_ws('-',F.collect_list("Name")).alias("Returned_value")).show()

#+---+-------+--------------+
#|Age|    Job|Returned_value|
#+---+-------+--------------+
#| 22|    Doc|      Mike-Jim|
#| 66|Fireman|     Steve-Dan|
#+---+-------+--------------+
...