Проверьте файл при потоковой передаче файла csv с помощью scala - PullRequest
0 голосов
/ 25 февраля 2019

Я работаю с потоковой передачей и не хочу обрабатывать старые файлы, когда новый потоковый файл появляется каждые 10 минут:

val val1= spark  
.read //  
.option("header", "true")    
.option("schema", "true")    
.option("sep", ",")    
.csv(path_to_file).toDF().cache()  
val1.registerTempTable("test")

после создания кадра данных я выполняю некоторые преобразования и обрабатываю контрольную точку.помогите мне и как я использовал в моем случае

1 Ответ

0 голосов
/ 07 марта 2019

***************** решение *******************

val spark =SparkSession .builder .appName ("test") .config ("spark.local", "local [*]") .getOrCreate () spark.sparkContext.setCheckpointDir (path_checkpoint) и после того, как я вызываю функцию контрольной точки на фрейме данных И яуказан триггер для выполнения задания

   .writeStream
    .format("csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .option("checkpointLocation",CheckPoint)   
 .trigger(Trigger.ProcessingTime("180 seconds")) 
    .option("Path",Path )  
    .option("header", true)  
    .outputMode("Append")
    .queryName("test")
    .start()
...