отладка проблем сериализации pyspark для работников - PullRequest
0 голосов
/ 24 сентября 2019

Я написал udf для pyspark, который использует локальный словарь для сопоставления двух столбцов данных в int.Словарь должен быть сериализован и отправлен работникам.Я получаю эту ошибку, когда показываю результат udf:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 170, in manager
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 73, in worker
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 402, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 717, in read_int
    raise EOFError
EOFError

, и я не могу понять это.Есть ли какой-то тихий сбой, если ваши данные Python слишком велики или слишком сложны для сериализации, или если вам нужно 2 или 3 объекта Python, упакованных и отправленных рабочим?

Я пытался обойти это неловко - вот небольшой пример того, что я пытаюсь сделать, но, конечно, это работает:

def test_add_expAB(df):
    expname2abRng = {'default': {'ctrl': ['0-50'], 'exposed': ['51-100']},
                     'modelA': {'ctrl':['0-10'], 'exposed': ['11-100']}}
    int_exp_names = {'unknown': 0, 'ctrl': 1, 'exposed': 2}

    def addExpName(game, ab):
        if None is ab:
            ab = 0
        gameMapping = expname2abRng.get(game, None)
        if None is gameMapping:
              gameMapping = expname2abRng['default']
        for expname, rngs in gameMapping.items():
              for abRng in rngs:
                  aa, bb = list(map(int, abRng.split('-')))
                  if (ab >= aa) and (ab <= bb):
                      return int_exp_names[expname]
        return 0

    udfAddExpName = F.udf(addExpName, T.IntegerType())
    df_expAB = df.withColumn('abExp', udfAddExpName(df['model'], df['ab']))
    return df_expAB

df = spark.createDataFrame(data=[{'model': 'modelA', 'ab': 23}, {'model': 'modelA', 'ab': 88}, {'model': 'modelB', 'ab': 23}, {'model': 'modelA', 'ab': 23},], 
                           schema=T.StructType([T.StructField('model', T.StringType()), 
                                                T.StructField('ab', T.IntegerType())]))
test_add_expAB(df).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...