У меня есть задание, которое загружает некоторые данные из Hive, а затем выполняет некоторую обработку и заканчивает запись данных в Cassandra. В какой-то момент все работало нормально, но вдруг у одной из операций Spark возникло узкое место, когда используются только 2 ядра, даже если счетчик разделов установлен на 2000 по всему конвейеру. Я использую версию Spark: spark-core_2.11-2.0.0 Моя конфигурация Spark выглядит следующим образом:
spark.executor.instances = "50"
spark.executor.cores = "4"
spark.executor.memory = "6g"
spark.driver.memory = "8g"
spark.memory.offHeap.enabled = "true"
spark.memory.offHeap.size = "4g"
spark.yarn.executor.memoryOverhead = "6096"
hive.exec.dynamic.partition.mode = "nonstrict"
spark.sql.shuffle.partitions = "3000"
spark.unsafe.sorter.spill.reader.buffer.size = "1m"
spark.file.transferTo = "false"
spark.shuffle.file.buffer = "1m"
spark.shuffle.unsafe.file.ouput.buffer = "5m"
Когда я выполняю дамп потока выполняющегося исполнителя, я вижу:
com.*.MapToSalaryRow.buildSalaryRow(SalaryTransformer.java:110)
com.*.MapToSalaryRow.call(SalaryTransformer.java:126)
com.*.MapToSalaryRow.call(SalaryTransformer.java:88)
org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$mapGroups$1.apply(KeyValueGroupedDataset.scala:220)
Упрощенная версия кода, в котором возникла проблема:
sourceDs.createOrReplaceTempView("salary_ds")
sourceDs.repartition(2000);
System.out.println("sourceDsdataset partition count = "+sourceDs.rdd().getNumPartitions());
Dataset<Row> salaryDs = sourceDs.groupByKey(keyByUserIdFunction, Encoders.LONG()).mapGroups(
new MapToSalaryRow( props), RowEncoder.apply(getSalarySchema())).
filter((FilterFunction<Row>) (row -> row != null));
salaryDs.persist(StorageLevel.MEMORY_ONLY_SER());
salaryDs.repartition(2000);
System.out.println("salaryDs dataset partition count = "+salaryDs.rdd().getNumPartitions());
Оба приведенных выше оператора печати показывают количество разделов, равное 2000
Соответствующий код Функция MapGroups:
class MapToSalaryInsightRow implements MapGroupsFunction<Long, Row, Row> {
private final Properties props;
@Override
public Row call(Long userId, Iterator<Row> iterator) throws Exception {
return buildSalaryRow(userId, iterator, props);
}
}
Если кто-то может указать, где проблема может быть высоко ценится. Спасибо