Панды не работают внутри параллельного кода Spark - PullRequest
0 голосов
/ 25 октября 2019

Я пытаюсь использовать Pandas "apply" внутри распараллеленного кода, но "apply" вообще не работает. Можем ли мы использовать «применить» внутри кода, который распространяется среди исполнителей при использовании Spark (распараллеливать на RDD)?

Код:

def testApply(k):
    return pd.DataFrame({'col1':k,'col2':[k*2]*5})

def testExec(x):
    df=pd.DataFrame({'col1':range(0,10)})
    ddf=pd.DataFrame(columns=['col1', 'col2'])
    ##In my case the below line doesn't get executed at all
    res= df.apply(lambda row: testApply(row.pblkGroup) if row.pblkGroup%2==0 else pd.DataFrame(), axis=1)

list1=[1,2,3,4]
sc=SparkContext.getOrCreate()
testRdd= sc.parallelize(list1)
output=testRdd.map(lambda x: testExec(x)).collect()



Ответы [ 2 ]

0 голосов
/ 28 октября 2019

Похоже, что Pandas с версией ниже 0,21 не поддерживает эту функцию. Я обновил версию Pandas, и она работает нормально.

0 голосов
/ 25 октября 2019

Чтобы использовать Pandas в Spark, у вас есть 2 варианта: -

Использование замыканий

Одна из самых сложных вещей в Spark - это понимание области действия и жизненного цикла переменных и методов при выполнении кода черезкластер. Операции RDD, которые изменяют переменные вне их области действия, могут быть частым источником путаницы. В приведенном ниже примере мы рассмотрим код, который использует foreach () для увеличения счетчика, но аналогичные проблемы могут возникнуть и для других операций.

Более подробную информацию можно найти здесь [1]

Пример

import numpy as np
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()

B = [2,0,1,0] 
V = [5,1,2,4]

def V_sum(row,b,c):
    return float(np.sum(c[row==b]))

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())    
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))

Подробную информацию можно найти здесь [2]

Использование Pandas UDF

В Spark 2.4.4 имеется стандартная возможность использовать Pandasс искрой. Подробности можно найти здесь вместе с примерами [3]

1 - http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures- 2 - Пользовательская функция через фрейм данных pyspark 3 - https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

...