Избегайте "spark.driver.maxResultSize" при преобразовании RDD в DataFrame - PullRequest
0 голосов
/ 27 февраля 2019

Я новичок в мире искр :-).Попытка написать рабочий процесс слияния объединяет маленькие журналы в большой согласно некоторым правилам.Одним из шагов я хочу преобразовать RDD в объект DataFrame и использовать API, так как проще управлять столбцами.Код ниже:

def main(path):
  print("Read from S3:{}".format(path))
  with SparkContext() as sc:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)
    spark.conf.set("spark.driver.extrajavaoptions", "-XX:+UseParallelGC")
    spark.conf.set("spark.executor.extrajavaoptions", "-XX:+UseParallelGC")
    #spark.conf.set("spark.driver.maxResultSize", "3g")
    spark.conf.set("spark.rdd.compress", "true")
    rdd = sc.newAPIHadoopFile(
        path,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
        keyConverter=None
    )
    rdd1 = rdd.map(lambda key_val: process_line(key_val[1])).reduceByKey(lambda x, y: x + y)
    print("RDD=>{}".format(rdd1.getNumPartitions()))
    df = rdd1.toDF(["ns_pod", "log"])
    df.write.partitionBy("ns_pod").option("quote", "\u0000").text('hdfs:///merged')
    df.explain()

Это хорошо работает в небольшом наборе данных, но когда я тестирую в большом наборе данных, я получаю исключение:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 11 tasks (1268.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Я провел некоторые исследования в Интернете, кажется, что я могупросто увеличьте «spark.driver.maxResultSize», как я показываю в комментариях, но он не масштабируется.Причиной возникновения этой ошибки является мастер-узел, пытающийся собрать данные с рабочих узлов.Интересно, есть ли способ конвертировать RDD в DF без триггера сбора данных?Не уверен, что я делаю что-то не так? :-) А если нет способа, как выбрать значение maxResultSize?(Пример: 80% памяти драйверов?) Спасибо.

...