Разделение искры по колонке - взрыв памяти - PullRequest
0 голосов
/ 01 ноября 2019

Я пытаюсь разбить паркет на несколько столбцов. Я делаю это, сначала создавая значения разделов с помощью набора функций, т.е. -

dataset.withColumn('p_year', functions.year(dataset.col('event_date')))
dataset.withColumn('p_month', functions.month(dataset.col('event_date')))
... etc

Затем разделяя RDD, используя keyBy

RDD<Tuple2<String, Row>> rddParts = dataset.rdd().keyBy(new BigDumbFunction());

...

class BigDumbFunction implements Function1<Row, String>, Serializable {

    private MySchema schema;

    public BigDumbFunction(MySchema schema) {
        this.schema = schema;
    }

    @Override
    public String apply(Row row) {
        StringBuilder stringBuilder = new StringBuilder();

        for(AbstractPartitionRule abstractPartitionRule: schema.getPartitionRules()) {
            String fieldName = abstractPartitionRule.getField();
            int fieldIndex = row.fieldIndex(fieldName);
            String value = row.getString(fieldIndex);

            stringBuilder.append(fieldName);
            stringBuilder.append("=");
            stringBuilder.append(value);
            stringBuilder.append("/");
        }

        return stringBuilder.substring(0, stringBuilder.length()-1);
    }
}

В результате получается кортеж RDD, гдепервое значение - это значение раздела, а второе - строка. Это на самом деле не сгруппировано и я должен сделать это вручную - что заставляет меня думать, keyBy не подходит для использования.

Чтобы на самом деле сгруппировать их, я просматриваю строки и сортирую их по ключу-

JavaRDD<Tuple2<String, Row>> jrows = rddParts.toJavaRDD();
Map<String, List<Row>> siftedRows = new HashMap<>();

jrows.foreach((jrowTuple) -> {
    String partitionName = jrowTuple._1();
    Row rowData = jrowTuple._2();

    if (!siftedRows.containsKey(partitionName)) {
        siftedRows.put(partitionName, new ArrayList<>());
    }
    siftedRows.get(partitionName).add(rowData);
});

Выполнение foreach или collect приведет к увеличению объема памяти до 1 ГБ или более. Я предполагаю, что это потому, что я добавляю его на карту, которая не может иметь те же оптимизации, что и в СДР.

Как я могу разделить на несколько столбцов с наименьшим объемом памяти, насколько это возможно? Могу ли я построить один столбец раздела со всеми значениями раздела и сделать фильтр по отдельности?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...