TypeError: невозможно выбрать объекты генератора: сбой Spark collect () из-за несериализуемого возвращаемого типа генератора (dict_key) - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть библиотечная функция, которая возвращает составной объект, содержащий генераторы, которые не могут быть засечены (попытка засечь приводит к ошибке TypeError: can't pickle dict_keys objects).

Когда я пытаюсь распараллелить через Spark, это не удаетсяна этапе сбора, из-за сбоя рассола (примечание: работает через DataBricks со значением по умолчанию sc).

Вот минимальное повторение:

test_list = [{"a": 1, "b": 2, "c": 3}, 
             {"a": 7, "b": 3, "c": 5}, 
             {"a": 2, "b": 3, "c": 4}, 
             {"a": 9, "b": 8, "c": 7}]

parallel_test_list = sc.parallelize(test_list)

parallel_results = parallel_test_list.map(lambda x: x.keys())

local_results = parallel_results.collect()

Трассировка стека, которую я получаю, длиннаЯ думаю, что соответствующая часть:

Traceback (most recent call last):
      File "/databricks/spark/python/pyspark/worker.py", line 403, in main
        process()
      File "/databricks/spark/python/pyspark/worker.py", line 398, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/databricks/spark/python/pyspark/serializers.py", line 418, in dump_stream
        bytes = self.serializer.dumps(vs)
      File "/databricks/spark/python/pyspark/serializers.py", line 597, in dumps
        return pickle.dumps(obj, protocol)
    TypeError: can't pickle dict_keys objects

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)

1 Ответ

0 голосов
/ 26 февраля 2019

Вы можете написать рекурсивную вспомогательную функцию, чтобы «потреблять» все вложенные объекты генератора, и map все ваши строки в вашем rdd с помощью этой функции.

Например, вот функция, которая будетпревратить вложенные генераторы в list s:

from inspect import isgenerator, isgeneratorfunction

def consume_all_generators(row):

    if isinstance(row, str):
        return row
    elif isinstance(row, dict):
        return {k: consume_all_generators(v) for k, v in row.items()}

    output = []
    try:
        for val in row:
            if isgenerator(val) or isgeneratorfunction(val):
                output.append(list(consume_all_generators(val)))
            else:
                output.append(consume_all_generators(val))
        return output
    except TypeError:
        return row

Теперь звоните map(consume_all_generators) до collect:

local_results = parallel_results.map(consume_all_generators).collect()
print(local_results)
#[['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b']]
...