Как можно обойти ограничение в 2 ГБ при использовании Dataset.groupByKey? - PullRequest
0 голосов
/ 08 октября 2018

При использовании Dataset.groupByKey(_.key).mapGroups или Dataset.groupByKey(_.key).cogroup в Spark я столкнулся с проблемой, когда одна из группировок приводит к более чем 2 ГБ данных.

Мне нужно нормализовать данные по группам, прежде чемЯ могу начать сокращать его, и я хотел бы разделить группы на более мелкие подгруппы, чтобы они лучше распределялись.Например, вот один способ, которым я пытался разделить группы:

val groupedInputs = inputData.groupByKey(_.key).mapGroups {
    case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))
}

Но, к сожалению, как бы я ни пытался обойти это, мои работы всегда умирают с такой ошибкой: java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632.При использовании сериализации Kryo я получаю другую ошибку Kryo serialization failed: Buffer overflow, рекомендующую увеличить spark.kryoserializer.buffer.max, но я уже увеличил его до предела 2 ГБ.

Одно решение, которое мне приходит в голову, - это добавитьслучайное значение ключей перед группировкой.Это не идеально, потому что это разделит каждую группу (не только большие), но я готов пожертвовать «идеалом» ради «работы».Этот код будет выглядеть примерно так:

val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey{ case(record, split) => (record.key, split)).mapGroups {
    case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))
}

Ответы [ 2 ]

0 голосов
/ 14 октября 2018

В этом случае, когда в наборе данных было много искажений, и было важно сгруппировать записи в группы регулярного размера, я решил обработать набор данных в два прохода.Сначала я использовал оконную функцию для нумерации строк по ключу и преобразовал ее в «групповой индекс» на основе настраиваемого «maxGroupSize»:

// The "orderBy" doesn't seem necessary here, 
// but the row_number function requires it.
val partitionByKey = Window.partitionBy(key).orderBy(key)

val indexedData = inputData.withColumn("groupIndex", 
  (row_number.over(partitionByKey) / maxGroupSize).cast(IntegerType))
  .as[(Record, Int)]

Затем я могу группировать по ключу и индексу, исоздавать группы одинакового размера - ключи с большим количеством записей разбиваются больше, а ключи с небольшим количеством записей могут вообще не разделяться.

indexedData.groupByKey{ case (record, groupIndex) => (record.key, groupIndex) }
  .mapGroups{ case((key, _), recordGroup) =>
      // Remove the index values before returning the groups
      (key, recordGroup.map(_._1))
  }
0 голосов
/ 09 октября 2018

Добавьте солт-ключ и выполните groupBy для вашего ключа и солт-ключа, а затем

import scala.util.Random
    val start = 1
      val end   = 5
      val randUdf = udf({() => start + Random.nextInt((end - start) + 1)})

      val saltGroupBy=skewDF.withColumn("salt_key", randUdf())
        .groupBy(col("name"), col("salt_key"))

Таким образом, все ваши асимметричные данные не попадают в одного исполнителя и вызывают ограничение в 2 ГБ.

Но вам нужно разработать логику для агрегирования вышеуказанного результата и, наконец, в конце удалить солевой ключ.

Когда вы используете groupBy, все записи с одним и тем же ключом дойдут до одного исполнителя и возникнет «горлышко бутылки»,Выше приведен один из способов его смягчения.

...