Я создаю DF, читая CSV-файл в Pyspark, а затем преобразуя его в RDD для применения UDF. Выдает ошибку при применении UDF.
Вот мой фрагмент кода -
# My UDF definition
def my_udf(string_array):
// some code //
return float_var
spark.udf.register("my_udf", my_udf, FloatType())
#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")
rdd = read_data.rdd
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])
Пример данных в read_data DF -
[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]
Схема DF, созданного чтением из файла CSV -
print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))
При применении UDF в строке get_df возникает следующая ошибка -
Traceback (последний вызов был последним): Файл "", строка 1, в файле "/usr/lib/spark/python/pyspark/sql/session.py", строка 58, в toDF возвращает файл sparkSession.createDataFrame (self, schema, sampleRatio) "/ usr / lib /spark/python/pyspark/sql/session.py ", строка 746, в файле createDataFrame rdd, схема = self._createFromRDD (файл data.map (prepare), схема, samplingRatio)" / usr / lib / spark / python / pyspark /sql/session.py ", строка 390, в _createFromRDD struct = self._inferSchema (rdd, samplingRatio, names = schema) Файл" /usr/lib/spark/python/pyspark/sql/session.py ", строка 377, в _inferSchema повышение ValueError («Некоторые типы не могут быть определены с помощью« ValueError: Некоторые типы не могут быть определены с помощью Первые 100 строк, пожалуйста, попробуйте еще раз с выборкой
Может кто-нибудь помочь мне передать массив (тип данных - строка) в UDF?