Как прочитать колонку из Pyspark RDD и применить к ней UDF? - PullRequest
0 голосов
/ 21 февраля 2020

Я создаю DF, читая CSV-файл в Pyspark, а затем преобразуя его в RDD для применения UDF. Выдает ошибку при применении UDF.

Вот мой фрагмент кода -

# My UDF definition
def my_udf(string_array):
    // some code //
    return float_var

spark.udf.register("my_udf", my_udf, FloatType())

#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")

rdd = read_data.rdd

get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])

Пример данных в read_data DF -

[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]

Схема DF, созданного чтением из файла CSV -

print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))

При применении UDF в строке get_df возникает следующая ошибка -

Traceback (последний вызов был последним): Файл "", строка 1, в файле "/usr/lib/spark/python/pyspark/sql/session.py", строка 58, в toDF возвращает файл sparkSession.createDataFrame (self, schema, sampleRatio) "/ usr / lib /spark/python/pyspark/sql/session.py ", строка 746, в файле createDataFrame rdd, схема = self._createFromRDD (файл data.map (prepare), схема, samplingRatio)" / usr / lib / spark / python / pyspark /sql/session.py ", строка 390, в _createFromRDD struct = self._inferSchema (rdd, samplingRatio, names = schema) Файл" /usr/lib/spark/python/pyspark/sql/session.py ", строка 377, в _inferSchema повышение ValueError («Некоторые типы не могут быть определены с помощью« ValueError: Некоторые типы не могут быть определены с помощью Первые 100 строк, пожалуйста, попробуйте еще раз с выборкой

Может кто-нибудь помочь мне передать массив (тип данных - строка) в UDF?

1 Ответ

0 голосов
/ 21 февраля 2020

Две вещи:

  1. если конвертировать DF в RDD, вам не нужно регистрировать my_udf как udf. Если вы регистрируете udf, вы напрямую обращаетесь к df, как read_data.withColumn("col3", my_udf(F.col("col3")))

  2. , проблема, с которой вы столкнулись, находится на шаге toDF, так как вы не указываете схему нового DF при преобразовании из RDD и spark пытается вывести тип из выборочных данных, но в вашем случае неявная подсказка типа не работает. Я создам схему вручную и перейду в toDF следующим образом

from pyspark.sql.types import StringType, FloatType, StructField, StructType
get_schema = StructType(
[StructField('col1', StringType(), True),
 StructField('col2', StringType(), True)
 StructField('col3', FloatType(), True)]
)
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(get_schema)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...