У меня есть потоковое приложение Spark, написанное на Scala, работающее на CDH.Приложение читает данные из Kafka и записывает данные в HDFS.Перед записью данных в HDFS я выполняю partitionBy, поэтому данные записываются разделенными.это код:
//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
if(!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
val columns = Array("key", "value")
val addOp = (record1: String, record2:String) => record1 + "\n" + record2
val mergeOp = (record1: String, record2:String) => record1 + record2
val initialValue = ""
val out = data.aggregateByKey(initialValue)(addOp, mergeOp)
out.toDF(columns: _*).coalesce(sparkExecutorsCount)
.write.mode(SaveMode.Append)
.partitionBy("key").text(MY_PATH)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
//handle empty RDD
}
}
Я ожидаю, что этот код генерирует следующий вывод (пример команды ls -l
):
> MY_PATH/key=1
> MY_PATH/key=1/file1.txt
> MY_PATH/key=1/file2.txt
> MY_PATH/key=1/file3.txt
> MY_PATH/key=2
> MY_PATH/key=2/file1.txt
> MY_PATH/key=2/file2.txt
> MY_PATH/key=2/file3.txt
и в каждом текстовом файле будетзаписи из DataFrame, строка за строкой.
На самом деле, это действительно происходит.Единственная проблема заключается в том, что initialValue
всегда отображается в качестве первой строки в каждом файле, хотя я initalValue=""
, таким образом, я всегда получаю дополнительную пустую строку в каждом файле.
Эта дополнительная пустая строка - огромная проблема для меняи я должен избегать этого.Один из вариантов - использовать groupByKey
вместо aggregateByKey
, но groupByKey
вызовет больше перестановок в кластере, и я бы хотел этого избежать.
Посоветуйте, пожалуйста, как предотвратить лишнюю пустую строкув каждом письменном файле.