Для службы emr я добавил шаг spark и обработал фрейм данных с помощью реализации Java Executor. Кусок кода не работает и не может записать фрейм данных в расположение S3. Без службы Executor он работает нормально. Здесь цель состоит в том, чтобы запустить длительный шаг emr в фоновом режиме и запустить другой шаг emr параллельно. Пожалуйста, дайте мне знать, что еще не хватает, спасибо!
override def execute(): Unit = {
this.createAWSConnections()
this.formMasterConfig()
this.performPreChecks()
val shadowExecutorService = ExecutionContext.
fromExecutorService(Executors.newFixedThreadPool(10, new ThreadFactoryBuilder().setDaemon(true).build()))
val spark = SparkSessions.createSparkSessionObject(
config.sparkClusterMode,
config.sourceFileCompression)
try {
implicit val resultData: DataFrame = SparkSql
.readAndExecuteSqlFiles(spark,
config.sqlFileNames,
config.sourceFolderPath,
config.destinationFolderPath)
.last
.last
shadowExecutorService.submit(new Runnable {
override def run(): Unit = {
try{
log.info("Executor Service started")
writeDataFrame
log.info("Executor Service finished writ yu`ing data")
}
}
})
}
override def writeDataFrame(implicit df: DataFrame,
config: ExecuteS3FileConfig): Unit = {
var data: DataFrame = df
val fileCountCsv = config.destinationFileCountCsv
val fileHeader = config.destinationFileHeader
val fileDelimiter = config.destinationFileDelimiter
val fileWriteMode = config.destinationFileWriteMode
val destinationPath = config.destinationFilePath
checkNull(data,
fileCountCsv,
fileHeader,
fileDelimiter,
fileWriteMode,
destinationPath)
if (fileCountCsv != DEFAULT_COUNT_IDENTIFIER)
data = data.coalesce(fileCountCsv.toInt)
data.write
.option(FIELD_HEADER, fileHeader)
.option(FIELD_COMPRESSION, COMPRESSION)
.option(FIELD_DELIMITER, fileDelimiter)
.mode(fileWriteMode)
.option(FIELD_QUOTE_ALL, QUOTE_ALL)
.csv(destinationPath)
}}