Spark SQL объединение дает исключение памяти - PullRequest
0 голосов
/ 25 октября 2019

У меня есть набор данных с примерно 500 тысячами строк и 20 столбцами.
Данные разбиты на уровни, всего их 10 (см. Изображение ниже, на котором показано 5 уровней), и чем ниже уровень, тем больше данныхон содержит (на самом деле это одни и те же данные для всех уровней, но для более низких уровней данные являются более подробными), моя цель состоит в том, чтобы зафиксировать некоторые значения на более низких уровнях (сверху вниз), чтобы сделать это, итеративно разделивОсновной набор данных и собрать один (родительский) уровень и его прямого потомка:

1&2
2&3
3&4
4&5....

Затем я объединяю набор родительских и дочерних данных на основе некоторых общих столбцов и выполняю исправление значений:

 for (Triple<String, String, Seq<String>> aggregationTriple : getAggregationLevels())
      {
        String parentLevel = aggregationTriple.getLeft();
        String childLevel = aggregationTriple.getMiddle();
        Seq<String> aggregationCols = aggregationTriple.getRight();
        Dataset<Row> parents = finalDataset.where(col(agg).equalTo(lit(parentLevel)));
        Dataset<Row> children = data.where(col(agg).equalTo(lit(childLevel)));
        Dataset<Row> joined = parents.join(children, aggregationCols, "inner");
        //Add new calculated metrics
        for(int i=0; i < METRICS.length; i++)
        {
          String metric = METRICS[i];
          String newMetricName = NEW_METRICS[i];
          //We work only on metrics here
          joined = joined
              .withColumn(PERFORM VALUE FIX HERE);
        }
        finalDataset = finalDataset.union(joined.select(ArrayUtils.addAll(COLUMNS_KEYS, NEW_COLUMNS_METRICS)));
      }
    return finalDataset;
  }

Проблема заключается в том, что при снижении до самого низкого уровня (с большим количеством данных) у меня возникает проблема нехватки памяти при производстве: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) Так что я выделил ему больше памяти (2 ГБ), нопо-видимому, никогда не бывает достаточно, чем больше памяти я устанавливаю (3 ГБ или 4 ГБ), тем больше она потребляет.
Поиск в Интернете обнаружил, что это означает, что один из рабочих отправляет баck для того, чтобы обработать слишком большой кусок данных, но если конечный файл (500 000 строк) имеет размер 40 МБ, то как один работник, который должен работать на небольшом разделе этих данных, может отправить такой большой кусок данных?
Мой цикл не создает файлы большого размера, как в заболоченном бесконечном цикле.
Дополнительные результаты тестов, локально на этот раз:
Что я замечаю, когда запускаю его локально, так это то, что искра внутри создает огромное количество задач: (26004) enter image description here
Дляотносительно небольшой файл паркета (6 МБ)
Кроме того, в визуализации DAG я вижу огромное дерево, в котором листья более или менее имеют одинаковое представление: enter image description here
Я не могу понятьпочему так много этапов создано для такого относительно небольшого файла. Просьба любое предложение очень приветствуется here a little example

1 Ответ

0 голосов
/ 25 октября 2019

Кажется, проблема может быть в Сериализации. Вы предоставили SparkConf какой-либо сериализатор?

Если не предоставите conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer в spark-submit.

Если это работает иначе, поиграйте с spark.kryoserializer.buffer https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization

...