Сохранение порядка данных в HDFS - PullRequest
0 голосов
/ 12 мая 2018

Входные данные:

key,date,value    
10,20180701,a10    
11,20180702,a11    
12,20180702,a12    
13,20180702,a13    
14,20180702,a14    
15,20180702,a15    
16,20180702,a16
17,20180702,a17    
18,20180702,a18    
19,20180702,a19    
1 ,20180701,a1     
2 ,20180701,a2     
3 ,20180701,a3     
4 ,20180701,a4     
5 ,20180701,a5     
6 ,20180701,a6 
7 ,20180701,a7 
8 ,20180701,a8 
9 ,20180701,a9 

Код

val rawData=sc.textFile(.....).
val datadf:DataFrame=rawData.toDF

После считывания данных в DF с колонками key,data,value

datadf.coalesce(1).orderBy(desc("key")).drop(col("key")).write.mode("overwrite").partitionBy("date").text("hdfs://path/")

Я пытаюсь упорядочить столбец по ключу столбца и удалить тот же столбец перед сохранением в формате hdf (в один файл на каждый день). Я не могу сохранить порядок в выходных файлах. если я не использую coalesce, порядок сохраняется, но генерируется несколько файлов.

Выход:

/20180701/part-xxxxxxx.txt

        a1
        a9
        a6
        a4
        a5
        a3
        a7
        a8
        a2
        a10
/20180702/part-xxxxxxx.txt  

        a18
        a12
        a13
        a19
        a15
        a16
        a17
        a11
        a14

Ожидаемый ОП:

/20180701/part-xxxxxxx.txt

        a1
        a2
        a3
        a4
        a5
        a6
        a7
        a8
        a9
        a10
/20180702/part-xxxxxxx.txt      

        a11
        a12
        a13
        a14
        a15
        a16
        a17
        a18
        a19

1 Ответ

0 голосов
/ 16 мая 2018

Следующий код должен помочь вам начать работу (это использует Spark 2.1): -

import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"key".int).add($"date".string).add($"value".string)
val df = spark.read.schema(schema).option("header","true").csv("source.txt")
df.coalesce(1).orderBy("key").drop("key").write.mode("overwrite").partitionBy("date").csv("hdfs://path/")
...