Группа SparkBy против перераспределения плюс mapPartitions - PullRequest
0 голосов
/ 16 января 2019

Мой набор данных ~ 20 миллионов строк, занимает ~ 8 ГБ ОЗУ. Я выполняю свою работу с 2 исполнителями, 10 ГБ ОЗУ на исполнителя, 2 ядра на исполнителя. Из-за дальнейших преобразований данные должны кэшироваться одновременно.

Мне нужно уменьшить дубликаты на основе 4 полей (выберите любой из дубликатов). Два варианта: использование groupBy и использование repartition и mapPartitions. Второй подход позволяет вам указать количество разделов и может работать быстрее из-за этого в некоторых случаях, верно?

Не могли бы вы объяснить, какой вариант имеет лучшую производительность? У обоих вариантов одинаковое потребление оперативной памяти?

Использование groupBy

dataSet
    .groupBy(col1, col2, col3, col4)
    .agg(
        last(col5),
        ...
        last(col17)
    );

Использование repartition и mapPartitions

dataSet.sqlContext().createDataFrame(
    dataSet
        .repartition(parallelism, seq(asList(col1, col2, col3, col4)))
        .toJavaRDD()
        .mapPartitions(DatasetOps::reduce),
    SCHEMA
);

private static Iterator<Row> reduce(Iterator<Row> itr) {
    Comparator<Row> comparator = (row1, row2) -> Comparator
        .comparing((Row r) -> r.getAs(name(col1)))
        .thenComparing((Row r) -> r.getAs(name(col2)))
        .thenComparingInt((Row r) -> r.getAs(name(col3)))
        .thenComparingInt((Row r) -> r.getAs(name(col4)))
        .compare(row1, row2);

    List<Row> list = StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
        .collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));

    return list.iterator();
}

1 Ответ

0 голосов
/ 16 января 2019

Второй подход позволяет вам указать количество разделов и может работать быстрее из-за этого в некоторых случаях, верно?

Не совсем. Оба подхода позволяют указать количество разделов - в первом случае через spark.sql.shuffle.partitions

spark.conf.set("spark.sql.shuffle.partitions", parallelism)

Однако второй подход по своей сути менее эффективен, если дубликаты являются общими, поскольку он перетасовывает сначала, а сокращает позже, пропуская сокращение на стороне карты (другими словами, это еще одна группировка по ключу). Если дубликаты редки, это не будет иметь большого значения.

В примечании Dataset уже содержится dropDuplicates вариантов , которые принимают набор столбцов, и first / last не имеет особого смысла здесь (см. Обсуждение в Как выбрать первый ряд каждой группы? ).

...