Используя PySpark, я пытаюсь добавить новый столбец к существующему фрейму данных, где записи в новом столбце представляют значения бинов, наиболее близкие к существующему столбцу. В примере, который я покажу ниже, числовой массив bucket_array
представляет ячейки (сегменты).
Соответствующий раздел кода PySpark, об ошибке которого я вскоре упомяну, приведен ниже:
#Function for finding nearest bucket
def find_nearest(value, bin_array):
bin_array = np.array(list(bin_array))
value = float(value)
idx = np.argmin(np.abs(bin_array - value))
return float(bin_array[idx])
def metric_analyze(entity_peer_labeled_df, metric, delta_weeks, normalize):
# delta_weeks = 1
# normalize = True
# metric : string which denotes column name
# entity_peer_labeled_df : some Pyspark dataframe which has a column titled "pct_difference"
bucket_array = np.arange(-1000, 1000, 5)
udf_nearest_bin = F.udf(find_nearest, T.FloatType())
bucket_df = ( entity_pct_metric_df.withColumn("bucket_array",
F.array(*[F.lit(i) for i in bucket_array])) ).withColumn( "pct_diff_{}_bucket".format(metric) ,
udf_nearest_bin("pct_difference", "bucket_array") )
bucket_df.show()
Когда я запускаю приведенный выше код в блокноте Jupyter, он работает нормально, и яспособен видеть фрейм данных bucket_df
.
Тем не менее, когда я сохраняю вышеупомянутый код как отдельную функцию python, импортирую его в свой блокнот Jupyter и затем, наконец, выполняю его, я получаю ошибку. Я замечаю, что ошибка происходит в строке bucket_df.show()
. Часть этой ошибки показана ниже:
/mnt1/jupyter/notebooks/username/custom_function.py in metric_analyze(entity_peer_labeled_df, metric, delta_weeks, normalize)
100 udf_nearest_bin("pct_difference", "bucket_array") )
101
--> 102 bucket_df.show()
/usr/lib/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
376 """
377 if isinstance(truncate, bool) and truncate:
--> 378 print(self._jdf.showString(n, 20, vertical))
379 else:
380 print(self._jdf.showString(n, int(truncate), vertical))
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
Полная ошибка может быть найдена здесь .
Когда я заменяю строку bucket.show()
на print( bucket.count() )
Я не вижу никаких ошибок, и он работает нормально (даже когда я использую вышеупомянутый код в качестве отдельной функции).
Ниже приведен пример entity_pct_metric_df
:
+--------------------+----------+-------------------+-------------------+------------------------------+--------------+
| entity_id|. category| sampled_ts| some_score| some_score_prev_value|pct_difference|
+--------------------+----------+-------------------+-------------------+------------------------------+--------------+
|abccccccccccccccc...| A|2017-12-03 00:00:00| 192| 824| -632.0|
|defffffffffffffff...| A|2017-12-10 00:00:00| 515| 192| 323.0|
|ghiiiiiiiiiiiiiii...| A|2017-12-17 00:00:00| 494| 515| -21.0|
+--------------------+----------+-------------------+-------------------+------------------------------+--------------+
Как решить вышеуказанную ошибку?