У меня есть коллекция файлов журнала приложений Spark, я хочу, чтобы каждый файл Имя приложения, Время отправки, Время завершения и Накопительные метрики были добавлены в виде одной строки в одном файле CSV. используя SPARK / SCALA
Редактировать:
Извините, но один файл журнала приложения Spark настолько огромен, чтобы его можно было вставить сюда, а также настолько сложен, что некоторые показатели неоднократно обновляются для каждой работы, и мне нужно общее количество всех из них - последние, а не обновленные - вот что я пытался до Теперь
import org.apache.log4j._
import org.apache.spark.sql._
object LogToCSV {
val Logs= "SparkAppName, SubmissionTime, CompletionTime,ExecutorDeserializeCpuTime,ResultSize,ShuffleReadRemoteBytesRead, ShuffleReadFetchWaitTime,MemoryBytesSpilled,ShuffleReadLocalBytesRead,ExecutorDeserializeTime,PeakExecutionMemory,ExecutorCpuTime, ShuffleReadLocalBlocksFetched,JVMGCTime,ShuffleReadRemoteBytesReadToDisk,ShuffleReadRecordsRead,DiskBytesSpilled,ExecutorRunTime,ShuffleReadRemoteBlocksFetched,Result"
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val ss = SparkSession
.builder
.appName("SparkSQLDFjoin")
.master("local[*]")
.getOrCreate()
import ss.implicits._
ScalaWriter.Writer.Write(Logs, "Results.csv")
val Dir = ss.sparkContext.wholeTextFiles("/home/rudaini/Desktop/Thesis/Results/Results/Tesx/*")
println(Dir.count())
Dir.foreach(F =>{
var SparkAppName = ""
var SubmissionTime: Double = 0
var CompletionTime: Double = 0
var ExecutorDeserializeCpuTime: Double = 0
var ResultSize = ""
var ShuffleReadRemoteBytesRead = ""
var ShuffleReadFetchWaitTime = ""
var MemoryBytesSpilled = ""
var ShuffleReadLocalBytesRead = ""
var ExecutorDeserializeTime = ""
var PeakExecutionMemory = ""
var ExecutorCpuTime = ""
var ShuffleReadLocalBlocksFetched = ""
var JVMGCTime = ""
var ShuffleReadRemoteBytesReadToDisk = ""
var ShuffleReadRecordsRead = ""
var DiskBytesSpilled = ""
var ExecutorRunTime = ""
var ShuffleReadRemoteBlocksFetched = ""
var Result = ""
F.toString().split("\n").foreach(L =>{
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("ApplicationStart")){
SubmissionTime = L.substring(L.indexOf("Timestamp")+11,
L.indexOf(",\"User\":\"")).toDouble}
if(L.contains("ApplicationEnd")){
CompletionTime = L.substring(L.indexOf("Timestamp")+11,L.indexOf("Timestamp")+24).toDouble}
if(L.contains("SparkSubmit.scala")){
ExecutorDeserializeCpuTime = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")).toDouble}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
if(L.contains("spark.app.name")){
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)}
})
val LineX = SparkAppName +","+ SubmissionTime +","+ CompletionTime +","+ ExecutorDeserializeCpuTime +","+ ResultSize +","+ ShuffleReadRemoteBytesRead +","+ ShuffleReadFetchWaitTime +","+ MemoryBytesSpilled +","+
ShuffleReadLocalBytesRead +","+ ExecutorDeserializeTime +","+ PeakExecutionMemory +","+ ExecutorCpuTime +","+
ShuffleReadLocalBlocksFetched +","+ JVMGCTime +","+ ShuffleReadRemoteBytesReadToDisk +","+
ShuffleReadRecordsRead +","+ DiskBytesSpilled +","+ ExecutorRunTime +","+ ShuffleReadRemoteBlocksFetched +","+
Result
ScalaWriter.Writer.Write(LineX, "Results.csv")
})
ss.stop()
}
}
Я еще не закончил, но получаю лучшие результаты с большим количеством модификаций