Как отладить MemoryError в PySpark - PullRequest
0 голосов
/ 20 мая 2019

Я хочу загрузить несколько XML-файлов (по 50 МБ каждый - около 3000 = 150 ГБ), обработать их и загрузить в BigQuery с помощью pyspark.В целях разработки я использовал ноутбук Jupyter и небольшое количество файлов 10. Я написал довольно сложный кластер установки кода на dataproc.В моем кластере daproc есть 6 ТБ HDFS, 10 узлов (каждые 4 ядра) и 120 ГБ ОЗУ.

def context():
    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
    import pyspark
    conf = pyspark.SparkConf()

    conf = (conf.setMaster('local[*]')
            .set('spark.executor.memory', '4G')
            .set('spark.driver.memory', '45G')
            .set('spark.driver.maxResultSize', '10G')
            .set("spark.python.profile", "true"))
    sc = pyspark.SparkContext(conf=conf)
    return sc
def job(sc):
    print("Job started")
    RDDread = sc.wholeTextFiles("s3a://custom-bucket/*/*.gz")
    models = RDDread.flatMap(process_xmls).groupByKey()
    tracking_all = (models.filter(lambda x: x[0] == TrackInformation)
                    .flatMap(lambda x: x[1])
                    .map(lambda model: (model.flight_ref, model))
                    .groupByKey())
    tracking_merged = tracking_all.map(lambda x: x[1]).map(merge_ti)
    flight_plans = (models.filter(lambda x: x[0] == FlightPlan).flatMap(lambda x: x[1]).map(lambda fp: (fp.flight_ref, fp)))
    fps_tracking = tracking_merged.union(flight_plans).groupByKey().filter(lambda x: len(x[1]) == 2)
    in_bq_batch = 1000
    n = fps_tracking.count()
    parts = ceil(n / in_bq_batch)
    many_n = fps_tracking.repartition(parts).mapPartitions(upload_fpm2)
    print("Job ended")
    return fps_tracking, tracking_merged, flight_plans, models, many_n

После 200 сообщений org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz] Я получаю 2 ошибки: java.lang.OutOfMemoryErrorи MemoryError, в основном MemoryError.Я думал, что у меня есть только 2 раздела после RDDread, поэтому я изменил код для: sc.wholeTextFiles ("s3a: //custom-bucket//.gz", minPartitions = 40) -> И он получилсломался еще быстрее.Я добавлял постоянную (DISK) функцию в некоторых случайных местах.

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 684, in loads
    return s.decode("utf-8") if self.use_unicode else s
MemoryError
19/05/20 14:09:23 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/20 14:09:30 ERROR org.apache.spark.util.Utils: Uncaught exception in thread stdout writer for /opt/conda/default/bin/python
java.lang.OutOfMemoryError: Java heap space

Что я делаю не так и как отлаживать свой код?

1 Ответ

0 голосов
/ 21 мая 2019

Вы, кажется, используете искру в локальном режиме (локально [*]).Это означает, что вы используете один jvm с 45 ГБ ОЗУ (spark.driver.memory) и что все ваши рабочие потоки работают в этом jvm.Параметр spark.executor.memory не действует Что означает setMaster `local [*]` в значении spark? .

Вам следует настроить master-устройство spark либо на планировщик пряжи, либоесли у вас нет пряжи, используйте автономный режим https://spark.apache.org/docs/latest/spark-standalone.html.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...