У меня есть потоковое приложение Spark, написанное на Scala, работающее на CDH. Приложение читает данные из Kafka и записывает данные в HDFS. Перед записью данных в HDFS я выполняю partitionBy
, поэтому данные записываются разделенными. Каждый раздел получает 3 файла при записи. Я также использую coalesce
, чтобы контролировать количество разделов моих данных. Я ожидаю, что количество разделов, установленное командой coalesce
, будет устанавливать количество файлов в выходном каталоге в HDFS, однако количество файлов всегда равно 3, несмотря на количество разделов, установленное командой coalesce
. Я попытался запустить с 3 исполнителями и с 6 исполнителями, но все равно количество файлов в каждом разделе равно 3.
Вот как я записываю данные в HDFS:
//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
if(!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
val columns = Array("key", "value")
data.toDF(columns: _*).coalesce(sparkExecutorsCount)
.write.mode(SaveMode.Append)
.partitionBy("key").text(MY_PATH)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
//handle empty RDD
}
}
Посоветуйте, пожалуйста, как заставить мои приложения spark записывать другое количество файлов в выходной каталог. Спасибо