Я пытаюсь реализовать pandas_udf в pyspark, где pandas_udf вернет тип структуры с 4 полями, и он будет расширен до 4 разных столбцов. Однако, когда я попытался вызвать эту функцию, я получил исключение, из-за которого не удалось выполнить требование: можно извлечь только скалярный векторизованный udf или sql batch udf '
def add_vars(self, df, index_dict_var):
"""
:return:
"""
def fill_var_udf(var_dict, index):
try:
val = var_dict.value[index]
return val
except Exception as e:
print(index)
print(var_dict)
raise e
def fill_var(var_dict):
return pandas_udf(lambda index: fill_var_udf(var_dict, index),
returnType=StructType([StructField("t0", StringType()),
StructField("t1", StringType()),
StructField("t2", StringType()),
StructField("t3", StringType())]),
functionType=PandasUDFType.GROUPED_MAP)
df = df.withColumn("var", fill_var(index_dict_var)(col('var_index')))
for i in range(len(self.var_columns)):
df = df.withColumn(self.var_columns[i], col('var')["t" + str(i)])
df = df.drop('var_index')
df.show()
return df
Здесь var_dict - это словарь, содержащий тип int как ключ и список строки в качестве значения, всякий раз, когда я вызываю эту функцию, я получаю: Исключение: не удалось выполнить требование: можно извлечь только скалярный векторизованный udf или sql пакетный udf 'Pyspark Примечание. Я использую pandas_udf вместо udf для производительности усиление, как указано здесь: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Traceback:
Traceback (most recent call last):
File "<path to file>", line 463, in steps
self.run_process_for_all_segments()
File "<path to file>", line 400, in run_process_for_all_segments
data = self.get_final_data(segment_id)
File "<path to file>", line 351, in get_final_data
df1, df2, df3, df4, df5 = self.sample_segments(segment_id)
File "<path to file>", line 300, in sample_segments
data = self.add_variants(df4, self.index_dict_var)
File "<path to file>", line 248, in add_variants
df.show()
File "<virtual_env path>/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
print(self._jdf.showString(n, 20, vertical))
File "<path to virtual env>/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "<path to virtual env>/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
IllegalArgumentException: u'requirement failed: Can only extract scalar vectorized udf or sql batch udf'
Traceback (most recent call last):
File "<path to file>", line 520, in <module>
obj1.steps()
File "<path to file>", line 471, in steps
raise Exception(e)
Exception: u'requirement failed: Can only extract scalar vectorized udf or sql batch udf'