При использовании Dataset.groupByKey(_.key).mapGroups
или Dataset.groupByKey(_.key).cogroup
в Spark я столкнулся с проблемой, когда одна из группировок приводит к более чем 2 ГБ данных.
Мне нужно нормализовать данные по группам, прежде чемЯ могу начать сокращать его, и я хотел бы разделить группы на более мелкие подгруппы, чтобы они лучше распределялись.Например, вот один способ, которым я пытался разделить группы:
val groupedInputs = inputData.groupByKey(_.key).mapGroups {
case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))
}
Но, к сожалению, как бы я ни пытался обойти это, мои работы всегда умирают с такой ошибкой: java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632
.При использовании сериализации Kryo я получаю другую ошибку Kryo serialization failed: Buffer overflow
, рекомендующую увеличить spark.kryoserializer.buffer.max, но я уже увеличил его до предела 2 ГБ.
Одно решение, которое мне приходит в голову, - это добавитьслучайное значение ключей перед группировкой.Это не идеально, потому что это разделит каждую группу (не только большие), но я готов пожертвовать «идеалом» ради «работы».Этот код будет выглядеть примерно так:
val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey{ case(record, split) => (record.key, split)).mapGroups {
case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))
}