У меня есть фрейм данных PySpark (скажем, df
), который имеет два столбца (Name
и Score
). Ниже приведен пример фрейма данных:
+------+-----+
| Name|Score|
+------+-----+
| name1|11.23|
| name2|14.57|
| name3| 2.21|
| name4| 8.76|
| name5|18.71|
+------+-----+
У меня есть массив Numpy (скажем, bin_array
), который имеет значения, близкие к числовым значениям в столбце с именем Score
фрейма данных PySpark,
Ниже приведен вышеупомянутый массив numpy:
bin_array = np.array([0, 5, 10, 15, 20])
Я хочу сравнить значение из каждой строки столбца Score
со значениями в bin_array
и сохранить ближайшее значение(получено из bin_array
) в отдельном столбце в фрейме данных PySpark.
Ниже приведен порядок отображения моего нового фрейма данных (скажем, df_new
).
+------+-----+------------+
| Name|Score| Closest_bin|
+------+-----+------------+
| name1|11.23| 10.0 |
| name2|14.57| 15.0 |
| name3| 2.21| 0.0 |
| name4| 8.76| 10.0 |
| name5|18.71| 20.0 |
+------+-----+------------+
У меня есть нижеупомянутая функция, которая дает мне самые близкие значения от bin_array
. Функция отлично работает, когда я тестирую ее с отдельными числами.
def find_nearest(array, value):
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return float(array[idx])
В моей работе у меня будут миллионы строк в datafrmae. Какой самый эффективный способ создания df_new
?
Ниже приведены шаги, которые я пытался использовать для создания пользовательской функции (udf) и нового фрейма данных (* 1034). *).
closest_bin_udf = F.udf( lambda x: find_nearest(array, x) )
df_new = df.withColumn( 'Closest_bin' , closest_bin_udf(df.Score) )
Но при попытке df_new.show()
я получил ошибки. Часть ошибки показана ниже.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-11-685c9b7e25d9> in <module>()
----> 1 df_new.show()
/usr/lib/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
376 """
377 if isinstance(truncate, bool) and truncate:
--> 378 print(self._jdf.showString(n, 20, vertical))
379 else:
380 print(self._jdf.showString(n, int(truncate), vertical))
Вы можете использовать указанные ниже шаги для создания вышеупомянутого кадра данных:
from pyspark.sql import *
import pyspark.sql.functions as F
import numpy as np
Stats = Row("Name", "Score")
stat1 = Stats('name1', 11.23)
stat2 = Stats('name2', 14.57)
stat3 = Stats('name3', 2.21)
stat4 = Stats('name4', 8.76)
stat5 = Stats('name5', 18.71)
stat_lst = [stat1 , stat2, stat3, stat4, stat5]
df = spark.createDataFrame(stat_lst)
df.show()