spark aggregateByKey добавляет лишнюю пустую строку - PullRequest
0 голосов
/ 16 октября 2018

У меня есть потоковое приложение Spark, написанное на Scala, работающее на CDH.Приложение читает данные из Kafka и записывает данные в HDFS.Перед записью данных в HDFS я выполняю partitionBy, поэтому данные записываются разделенными.это код:

//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
             ssc,
             PreferConsistent,
             Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
    if(!rdd.isEmpty()) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
        val columns = Array("key", "value")
        val addOp = (record1: String, record2:String) => record1 + "\n" + record2
        val mergeOp = (record1: String, record2:String) => record1 + record2
        val initialValue = ""
        val out = data.aggregateByKey(initialValue)(addOp, mergeOp)
        out.toDF(columns: _*).coalesce(sparkExecutorsCount)
            .write.mode(SaveMode.Append)
            .partitionBy("key").text(MY_PATH)

       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    } else {
        //handle empty RDD
    }
}

Я ожидаю, что этот код генерирует следующий вывод (пример команды ls -l):

> MY_PATH/key=1
> MY_PATH/key=1/file1.txt
> MY_PATH/key=1/file2.txt
> MY_PATH/key=1/file3.txt
> MY_PATH/key=2
> MY_PATH/key=2/file1.txt
> MY_PATH/key=2/file2.txt
> MY_PATH/key=2/file3.txt

и в каждом текстовом файле будетзаписи из DataFrame, строка за строкой.

На самом деле, это действительно происходит.Единственная проблема заключается в том, что initialValue всегда отображается в качестве первой строки в каждом файле, хотя я initalValue="", таким образом, я всегда получаю дополнительную пустую строку в каждом файле.

Эта дополнительная пустая строка - огромная проблема для меняи я должен избегать этого.Один из вариантов - использовать groupByKey вместо aggregateByKey, но groupByKey вызовет больше перестановок в кластере, и я бы хотел этого избежать.

Посоветуйте, пожалуйста, как предотвратить лишнюю пустую строкув каждом письменном файле.

1 Ответ

0 голосов
/ 16 октября 2018

TL;DR Просто используйте groupByKey, а затем mapValues(_.mkString("\n")).

Две вещи:

  • initialValue может быть добавлено произвольно (на практике #partitions) количество раз.Это означает, что каждый раздел будет начинаться с пустой строки, за которой следует знак новой строки.Вы проверяете, являются ли record1 или record2 пустыми для addOp и mergeOp и пропускаете \n, если это так.

  • Кроме того, ваше заявление:

    но groupByKey вызовет больше тасов в кластере, и я бы хотел этого избежать.

    не совсем точно.Код, который у вас есть, не значительно (если вообще) уменьшает объем данных.В зависимости от ключа, он может на самом деле увеличить его.

    См. Например:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...