набор данных groupByKey mapГруппы только с использованием 2 исполнителей из 50 назначенных - PullRequest
1 голос
/ 05 марта 2020

У меня есть задание, которое загружает некоторые данные из Hive, а затем выполняет некоторую обработку и заканчивает запись данных в Cassandra. В какой-то момент все работало нормально, но вдруг у одной из операций Spark возникло узкое место, когда используются только 2 ядра, даже если счетчик разделов установлен на 2000 по всему конвейеру. Я использую версию Spark: spark-core_2.11-2.0.0 Моя конфигурация Spark выглядит следующим образом:

  spark.executor.instances = "50"
  spark.executor.cores = "4"
  spark.executor.memory = "6g"
  spark.driver.memory = "8g"
  spark.memory.offHeap.enabled = "true"
  spark.memory.offHeap.size = "4g"
  spark.yarn.executor.memoryOverhead = "6096"
  hive.exec.dynamic.partition.mode = "nonstrict"
  spark.sql.shuffle.partitions = "3000"
  spark.unsafe.sorter.spill.reader.buffer.size  = "1m"
  spark.file.transferTo = "false"
  spark.shuffle.file.buffer = "1m"
  spark.shuffle.unsafe.file.ouput.buffer = "5m"

Когда я выполняю дамп потока выполняющегося исполнителя, я вижу:

com.*.MapToSalaryRow.buildSalaryRow(SalaryTransformer.java:110)
com.*.MapToSalaryRow.call(SalaryTransformer.java:126)
com.*.MapToSalaryRow.call(SalaryTransformer.java:88)
org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$mapGroups$1.apply(KeyValueGroupedDataset.scala:220)

Упрощенная версия кода, в котором возникла проблема:


 sourceDs.createOrReplaceTempView("salary_ds")
 sourceDs.repartition(2000);
 System.out.println("sourceDsdataset partition count = "+sourceDs.rdd().getNumPartitions());
 Dataset<Row> salaryDs = sourceDs.groupByKey(keyByUserIdFunction, Encoders.LONG()).mapGroups(
                new MapToSalaryRow( props), RowEncoder.apply(getSalarySchema())).
                filter((FilterFunction<Row>) (row -> row != null));
  salaryDs.persist(StorageLevel.MEMORY_ONLY_SER());
  salaryDs.repartition(2000);
  System.out.println("salaryDs dataset partition count = "+salaryDs.rdd().getNumPartitions());

Оба приведенных выше оператора печати показывают количество разделов, равное 2000

Соответствующий код Функция MapGroups:


class MapToSalaryInsightRow implements MapGroupsFunction<Long, Row, Row> {
    private final Properties props;

        @Override
    public Row call(Long userId, Iterator<Row> iterator) throws Exception {
        return buildSalaryRow(userId, iterator, props);
    }
}

Если кто-то может указать, где проблема может быть высоко ценится. Спасибо

1 Ответ

0 голосов
/ 11 марта 2020

Я полагаю, это произошло из-за большого сокращения серверов HDP, на которых я работал. Первоначально я начал с большого сервера, где я мог получить 300 ядер с 12 ГБ памяти на каждом. Однако внезапно мне пришлось использовать тот, который был 1/10. Поэтому я сократил его до 30 ядер, а также памяти до 4, затем процесс занял гораздо больше времени.

...