Java Служба Executor не работает для запуска шага EMR - PullRequest
0 голосов
/ 01 мая 2020

Для службы emr я добавил шаг spark и обработал фрейм данных с помощью реализации Java Executor. Кусок кода не работает и не может записать фрейм данных в расположение S3. Без службы Executor он работает нормально. Здесь цель состоит в том, чтобы запустить длительный шаг emr в фоновом режиме и запустить другой шаг emr параллельно. Пожалуйста, дайте мне знать, что еще не хватает, спасибо!

    override def execute(): Unit = {
this.createAWSConnections()
this.formMasterConfig()
this.performPreChecks()


val shadowExecutorService = ExecutionContext.
  fromExecutorService(Executors.newFixedThreadPool(10, new ThreadFactoryBuilder().setDaemon(true).build()))

val spark = SparkSessions.createSparkSessionObject(
  config.sparkClusterMode,
  config.sourceFileCompression)

try {
  implicit val resultData: DataFrame = SparkSql
    .readAndExecuteSqlFiles(spark,
      config.sqlFileNames,
      config.sourceFolderPath,
      config.destinationFolderPath)
    .last
    .last


    shadowExecutorService.submit(new Runnable {
      override def run(): Unit = {
        try{
          log.info("Executor Service started")

          writeDataFrame

          log.info("Executor Service finished writ yu`ing data")

        }

      }
    })
  }


 override def writeDataFrame(implicit df: DataFrame,
                             config: ExecuteS3FileConfig): Unit = {
var data: DataFrame = df
val fileCountCsv = config.destinationFileCountCsv
val fileHeader = config.destinationFileHeader
val fileDelimiter = config.destinationFileDelimiter
val fileWriteMode = config.destinationFileWriteMode
val destinationPath = config.destinationFilePath

checkNull(data,
          fileCountCsv,
          fileHeader,
          fileDelimiter,
          fileWriteMode,
          destinationPath)

if (fileCountCsv != DEFAULT_COUNT_IDENTIFIER)
  data = data.coalesce(fileCountCsv.toInt)

data.write
  .option(FIELD_HEADER, fileHeader)
  .option(FIELD_COMPRESSION, COMPRESSION)
  .option(FIELD_DELIMITER, fileDelimiter)
  .mode(fileWriteMode)
  .option(FIELD_QUOTE_ALL, QUOTE_ALL)
  .csv(destinationPath)

}}

...