Почему Spark всегда записывает одинаковое количество файлов в HDFS? - PullRequest
0 голосов
/ 18 сентября 2018

У меня есть потоковое приложение Spark, написанное на Scala, работающее на CDH. Приложение читает данные из Kafka и записывает данные в HDFS. Перед записью данных в HDFS я выполняю partitionBy, поэтому данные записываются разделенными. Каждый раздел получает 3 файла при записи. Я также использую coalesce, чтобы контролировать количество разделов моих данных. Я ожидаю, что количество разделов, установленное командой coalesce, будет устанавливать количество файлов в выходном каталоге в HDFS, однако количество файлов всегда равно 3, несмотря на количество разделов, установленное командой coalesce. Я попытался запустить с 3 исполнителями и с 6 исполнителями, но все равно количество файлов в каждом разделе равно 3.

Вот как я записываю данные в HDFS:

//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
             ssc,
             PreferConsistent,
             Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
    if(!rdd.isEmpty()) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
        val columns = Array("key", "value")
        data.toDF(columns: _*).coalesce(sparkExecutorsCount)
            .write.mode(SaveMode.Append)
            .partitionBy("key").text(MY_PATH)

       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    } else {
        //handle empty RDD
    }
}

Посоветуйте, пожалуйста, как заставить мои приложения spark записывать другое количество файлов в выходной каталог. Спасибо

1 Ответ

0 голосов
/ 23 сентября 2018

coalesce не переставляет данные на ключе, он объединяет разделы без перераспределения записей между разделами.В вашем примере partitionBy вызывается не на Dataframe, а на DataFrameWriter, который возвращается функцией .write .В этом случае выглядит так, что столбец ключ имеет 3 значения, поэтому с помощью Dataframe можно объяснить 3 папки (ключ = 1, ключ = 2, ключ = 3) и 3 файла в каждой папке с одинаковой отметкой времени.имея как минимум 3 раздела, так как на каждом разделе будет работать модуль записи, который должен выводить данные в 3 папки (ключ = 1, ключ = 2, ключ = 3).Я подозреваю, что «sparkExecutorsCount == 6» не оказал никакого влияния, возможно, потому, что Kafka предоставил вам только 3 раздела, и в этом случае объединение не имеет никакого влияния.попробуйте coalesce (1) или вместо него используйте перераспределение ($ "ключ") и сохраните существующий paritionBy

data.toDF(columns: _*).repartition($"key")
        .write.mode(SaveMode.Append)
        .partitionBy("key").text(MY_PATH)

или

data.toDF(columns: _*).repartition(sparkExecutorsCount, $"key")
        .write.mode(SaveMode.Append)
        .partitionBy("key").text(MY_PATH)
...