Pyspark User-Defined_functions внутри класса - PullRequest
0 голосов
/ 16 октября 2019

Я пытаюсь создать Spark-UDF внутри класса Python. Это означает, что одним из методов в классе является UDF. Я получаю сообщение об ошибке «PicklingError: Не удалось сериализовать объект: TypeError: невозможно выбрать объекты _MovedItems»

Среда: блоки данных Azure. (DBR версии 6.1 Beta) Выполнение кода: во встроенном блокноте. Версия Python: 3.5 Версия Spark: 2.4.4

Я попытался определить UDF вне класса в отдельной ячейке, и UDF работает. Я не хочу писать такой код, я должен следовать принципам ООП и хотел бы, чтобы он был структурированным. Я все перепробовал в гугле, не помогло. На самом деле я даже не получил информацию об ошибке, которую я получаю. «PicklingError: Не удалось сериализовать объект: TypeError: невозможно выбрать объекты _MovedItems»

class phases():
  def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
    print("Inside the constructor of Class phases ")

    #I need the below 2 variables to be used in my UDF, so i am trying to put 
    them in a class
    self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
    self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg

  #This is the UDF. 
  def phases_commence(self,each_row):
    print(a)
    return 1

  #This is the function that registers the UDF, 
  def initiate_the_phases_on_the_major_track_segment(self):
    print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")

    #registering the UDF
    self.phases_udf = udf(self.phases_commence,LongType())
    new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
    display(new_df)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):

  phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
  phase_obj.initiate_the_phases_on_the_major_track_segment()

Сообщение об ошибке: PicklingError: Не удалось сериализовать объект: TypeError: невозможно выбрать объекты _MovedItems

1 Ответ

0 голосов
/ 17 октября 2019

Ваша функция должна быть статической, чтобы определить ее как udf. Я искал какую-то документацию, чтобы дать хорошее объяснение, но не смог ее найти.

В основном (возможно, не на 100% точные; исправления приветствуются), когда вы определяете udf, он получает травление и копирует в каждыйвыполняется автоматически, но вы не можете выделить один метод класса , который не определен на верхнем уровне (класс является частью верхнего уровня, но не его методами). Посмотрите на этот пост для обходных путей, отличных от статических методов.

import pyspark.sql.functions as F
import pyspark.sql.types as T


class Phases():
  def __init__(self, df1):
    print("Inside the constructor of Class phases ")

    self.df1 = df1
    self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())

  #This is the UDF. 
  @staticmethod
  def phases_commence(age):
    age = age +3
    return age

  #This is the function that registers the UDF, 
  def doSomething(self):
    print("Inside the doSomething")
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))

l =[(1,   10   ,  'F')
,(2 ,   2   ,  'M')
,(2 ,  10  ,   'F')
,(2 ,  3  ,    'F')
,(3 ,  10,     'M')]

columns = ['id',  'Age',  'Gender']

df=spark.createDataFrame(l, columns)

bla = Phases(df)
bla.doSomething()
bla.df1.show()

Вывод:

Inside the constructor of Class phases 
Inside the 'initiate_the_phases_on_the_major_track_segment()' 
+---+---+------+-----+ 
| id|Age|Gender|AgeP2| 
+---+---+------+-----+ 
|  1| 10|     F|   13| 
|  2|  2|     M|    5| 
|  2| 10|     F|   13| 
|  2|  3|     F|    6| 
|  3| 10|     M|   13| 
+---+---+------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...