Не удается создать набор данных для паркета Petastorm через Spark с ошибкой переполнения (больше 4 ГБ) - PullRequest
0 голосов
/ 19 ноября 2018

Я пытаюсь реализовать создание набора данных Uber Petastorm, которое использует Spark для создания файла паркета, следуя инструкциям на их странице Github .

Код:

spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext

with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                         schema=MySchema, row_group_size_mb=256):

    logging.info('Building RDD...')
    rows_rdd = sc.parallelize(ids)\
        .map(row_generator)\  # Generator that yields lists of examples
        .flatMap(lambda x: dict_to_spark_row(MySchema, x))

    logging.info('Creating DataFrame...')
    spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
        .coalesce(10) \
        .write \
        .mode('overwrite') \
        .parquet('file:///opt/data/hello_world_dataset')

Теперь код СДР выполняется успешно, но происходит сбой только вызова .createDataFrame со следующей ошибкой:

_pickle.PicklingError: Не удалось сериализовать широковещательную рассылку: OverflowError: невозможно сериализовать строку, размер которой превышает 4 ГБ

Это мой первый опыт работы со Spark, поэтому я не могу точно сказать, происходит ли эта ошибка в Spark или Petastorm.

Просмотр других решений этой ошибки (в отношении Spark, а не Petastorm) Я видел, что это может быть связано с протоколом травления, но я не могу подтвердить это, и при этом я не нашел способ изменить протокол травления.

Как я мог избежать этой ошибки?

Ответы [ 2 ]

0 голосов
/ 07 февраля 2019

Чтобы построить bluesummers, ответьте

В основной ветке spark сейчас исправлена ​​ эта проблема, поэтому я использовал этот код для исправления функции дампа таким же образом, но это немногоболее безопасный.[тест с 2.3.2]

from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle

def broadcast_dump(self, value, f):
    try:
        pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) 
    except pickle.PickleError:
        raise
    except Exception as e:
        msg = "Could not serialize broadcast: %s: %s" \
                % (e.__class__.__name__, _exception_message(e))
        print_exec(sys.stderr)
        raise pickle.PicklingError(msg)
    f.close()

broadcast.Broadcast.dump = broadcast_dump
0 голосов
/ 21 ноября 2018

Проблема заключается в том, что выполняется сортировка для передачи данных между различными процессами. По умолчанию используется протокол выбора 2, и нам нужно использовать 4 для передачи объектов размером более 4 ГБ.

Дляизмените протокол протравливания, перед созданием сеанса Spark используйте следующий код

from pyspark import broadcast
import pickle


def broadcast_dump(self, value, f):
    pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
    f.close()

    return f.name


broadcast.Broadcast.dump = broadcast_dump
...