Использование python и scala для переименования выходного файла из azure / databricks - PullRequest
0 голосов
/ 03 октября 2019

Я пытаюсь переименовать выходной файл из Python по умолчанию случайной коллекции символов в нечто более разумное, содержащее дату / время, чтобы иметь уникальность в имени файла

Вот код, который я использовал,Питон отправляет файл на общий диск, но с неиспользуемым именем. Я попытался найти способ переименования файла в коде Python, но не получилось. Затем я начал смотреть на скалу, и хотя она делает то, что я хочу, ну почти. Кажется, что он работает нормально, но не выдает выходной файл, возможно, что-то связанное с разработчиком, например, Me !!.

Любая помощь будет признательна

%python
try:
  dfsql = spark.sql("select * from dbsmets1mig02_technical_build.tbl_Temp_Output_CS_Notes_Final order by record1") #Replace with your SQL
except:
  print("Exception occurred")
if dfsql.count() == 0:
  print("No data rows")
else:
  dfsql.coalesce(1).write.format("com.databricks.spark.csv").option("quote", "").option("header","false").option("delimiter","|").mode("overwrite").save(
"/mnt/publisheddatasmets1mig/metering/smets1mig/cs/system_data_build/notes/outbound/")    

%scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new outputfile
}


// replace newdata, outputfile and filename values with preferred values
val dfsql = sqlContext.sql("select * from dbsmets1mig02_technical_build.tbl_Temp_Output_CS_Notes_Final order by record1") //SQL here

val outputfile = "/mnt/publisheddatasmets1mig/metering/smets1mig/cs/system_data_build/notes/outbound"  //PATH names here

var filename = "CS_Notes"  //Filename here
var fileext = ".csv"

//val dateFormat = "yyyyMMdd_HHmm"
val dateFormat = "dd-MM-yyyy_HH-mm-ss"
val dateValue = spark.range(1).select(date_format(current_timestamp,dateFormat)).as[(String)].first

filename = filename + "_" + dateValue
var outputFileName = outputfile + "/" + filename + fileext
var mergedFileName = outputfile + "/" + filename + fileext
var mergeFindGlob  = outputFileName

dfsql.write.format("com.databricks.spark.csv").option("header", "false").option("delimiter", "|").option("quote","\u0000").mode("overwrite").mode("overwrite").save(outputFileName)
merge(mergeFindGlob, mergedFileName )
dfsql.unpersist()

1 Ответ

0 голосов
/ 03 октября 2019

Создайте один массовый файл в путь и затем переименуйте, однако это не рекомендуемый способ, поскольку нарушается концепция раздела hadoop.

val outputFilePath = "/mnt/data/output"
dfsql.coalesce(1).write.format("com.databricks.spark.csv").option("header", "false").option("delimiter", "|").option("quote","\u0000").mode("overwrite").mode("overwrite").save(outputFileName)
// /mnt/data/output/1234565125435.csv
val outputFileName = "/mnt/data/output/filename.csv"
//Rename /mnt/data/output/1234565125435.csv to /mnt/data/output/filename.csv
rename(outputFilePath, outputFileName)
...