Я написал udf для pyspark, который использует локальный словарь для сопоставления двух столбцов данных в int.Словарь должен быть сериализован и отправлен работникам.Я получаю эту ошибку, когда показываю результат udf:
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 170, in manager
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 73, in worker
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 402, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 717, in read_int
raise EOFError
EOFError
, и я не могу понять это.Есть ли какой-то тихий сбой, если ваши данные Python слишком велики или слишком сложны для сериализации, или если вам нужно 2 или 3 объекта Python, упакованных и отправленных рабочим?
Я пытался обойти это неловко - вот небольшой пример того, что я пытаюсь сделать, но, конечно, это работает:
def test_add_expAB(df):
expname2abRng = {'default': {'ctrl': ['0-50'], 'exposed': ['51-100']},
'modelA': {'ctrl':['0-10'], 'exposed': ['11-100']}}
int_exp_names = {'unknown': 0, 'ctrl': 1, 'exposed': 2}
def addExpName(game, ab):
if None is ab:
ab = 0
gameMapping = expname2abRng.get(game, None)
if None is gameMapping:
gameMapping = expname2abRng['default']
for expname, rngs in gameMapping.items():
for abRng in rngs:
aa, bb = list(map(int, abRng.split('-')))
if (ab >= aa) and (ab <= bb):
return int_exp_names[expname]
return 0
udfAddExpName = F.udf(addExpName, T.IntegerType())
df_expAB = df.withColumn('abExp', udfAddExpName(df['model'], df['ab']))
return df_expAB
df = spark.createDataFrame(data=[{'model': 'modelA', 'ab': 23}, {'model': 'modelA', 'ab': 88}, {'model': 'modelB', 'ab': 23}, {'model': 'modelA', 'ab': 23},],
schema=T.StructType([T.StructField('model', T.StringType()),
T.StructField('ab', T.IntegerType())]))
test_add_expAB(df).show()