Я получаю Can't pickle local object '<lambda>.<locals>.<lambda>'
ошибку при выполнении кода ниже.Во избежание ошибки сериализации NAStatCounter
класс объявлен в отдельном файле.К сожалению, я не смог найти ни одного поста, относящегося к этой ошибке рассола в отношении Pyspark RDD.Из многих постов я понял, что вышеупомянутая ошибка может быть решена с помощью dill as pickle
для Python 3.5.Я не уверен, как пакет укропа может использоваться в этом контексте.
Код:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.statcounter import StatCounter
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.mllib.linalg import Vector
from pyspark.mllib.linalg import Vectors
from pyspark import SparkContext,SparkConf
from collections import namedtuple
from helperClass import NAStatCounter
def isHeader(line):
return "id_1" not in line
def toDouble(s):
if "?" in s :
return None
else:
return float(s)
MatchData = namedtuple('MatchData',['id1','id2','scores','matched'], verbose=False)
def parse(line):
pieces=line[1:-1].split(",")
id1 = int(line[0])
id2 = int(line[1])
scores = map(lambda x:toDouble(x),pieces[2:11])
matched = pieces[11] == "TRUE"
return MatchData(id1,id2,scores,matched)
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
conf = SparkConf().setAppName("AdvancedAnalytics1").setMaster("local")
sc = SparkContext(conf=conf,pyFiles=['C:/Users/Vishnu59/Sparksample/helperClass.py'])
rawblocks = sc.textFile("file:///C:/Users/Vishnu59/Desktop/hadoop/Spark/ch01.csv")
noheader = rawblocks.filter(lambda x:isHeader(x))
parsed = noheader.map(lambda line:parse(line))
nasRDD = parsed.map(lambda md: map((lambda d:NAStatCounter().add(d)),md.scores))
reduced = nasRDD.reduce(lambda x,y:map(lambda a: a[0].mergeStats(a[1]),zip(x,y)))