Я пытаюсь преобразовать фрейм данных pandas на каждом рабочем узле (RDD, где каждый элемент является фреймом данных pandas) в искровой фрейм данных на всех рабочих узлах.
Пример:
def read_file_and_process_with_pandas(filename):
data = pd.read(filename)
"""
some additional operations using pandas functionality
here the data is a pandas dataframe, and I am using some datetime
indexing which isn't available for spark dataframes
"""
return data
filelist = ['file1.csv','file2.csv','file3.csv']
rdd = sc.parallelize(filelist)
rdd = rdd.map(read_file_and_process_with_pandas)
Предыдущие операции работают, поэтому у меня есть несколько кадров данных панд. Как я могу преобразовать это затем в искровой фрейм данных после того, как я закончу с обработкой панд?
Я пытался сделать rdd = rdd.map(spark.createDataFrame)
, но когда я делаю что-то вроде rdd.take(5)
, я получаю следующую ошибку:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o103.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Есть ли способ преобразования кадров данных pandas в каждом рабочем узле в распределенный кадр данных?