Выполнение внешней команды s3-dist-cp в spark-scala через API scala.sys.process - PullRequest
3 голосов
/ 31 мая 2019

когда я запускаю все эти 3 команды в оболочке / терминале Unix, все они работают нормально, возвращая состояние выхода как 0

unix_shell> ls -la
unix_shell> hadoop fs -ls /user/hadoop/temp
unix_shell> s3-dist-cp --src ./abc.txt --dest s3://bucket/folder/

сейчас я пытаюсь запустить эти же команды через scala-процесс api как внешний процесс, пример кода, как показано ниже:

import scala.sys.process._

val cmd_1 = "ls -la"
val cmd_2 = "hadoop fs -ls /user/hadoop/temp/"
val cmd_3 = "/usr/bin/s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"
val cmd_4 = "s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"

val exitCode_1 = (stringToProcess(cmd_1)).! // works fine and produces result
val exitCode_2 = (stringToProcess(cmd_2)).! // works fine and produces result
val exitCode_3 = (stringToProcess(cmd_3)).! // **it just hangs, yielding nothing**
val exitCode_4 = (stringToProcess(cmd_4)).! // **it just hangs, yielding nothing**

Разница между вышеупомянутыми cmd_3 и cmd_4 - просто абсолютный путь. И я передаю соответствующую зависимость явно в сценарии spark-submit, как показано ниже

--jars hdfs:///user/hadoop/s3-dist-cp.jar

Ваш вклад / предложение будет полезно. Спасибо!

Ответы [ 2 ]

1 голос
/ 02 июня 2019

На самом деле процесс scala работает вне контекста spark, поэтому для успешного выполнения этой команды s3-dist-cp я остановил контекст spark перед запуском процесса scala, который включает команду s3-dist-cp,полный рабочий код выглядит следующим образом:

    logger.info("Moving ORC files from HDFS to S3 !!")

    import scala.sys.process._

    logger.info("stopping spark context..##")
    val spark = IngestionContext.sparkSession
    spark.stop()
    logger.info("spark context stopped..##")
    logger.info("sleeping for 10 secs")
    Thread.sleep(10000) // this sleep is not required, this was just for debugging purpose, you can remove this in your final code.
    logger.info("woke up after sleeping for 10 secs")

    try {
      /**
       * following is the java version, off course you need take care of few imports
       */
      //val pb = new java.lang.ProcessBuilder("s3-dist-cp", "--src", INGESTED_ORC_DIR, "--dest", "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp", "--srcPattern", ".*\\.orc")
      //val pb = new java.lang.ProcessBuilder("hadoop", "jar", "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "--src", INGESTED_ORC_DIR, "--dest", "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp", "--srcPattern", ".*\\.orc")
      //pb.directory(new File("/tmp"))
      //pb.inheritIO()
      //pb.redirectErrorStream(true)
      //val process = pb.start()
      //val is = process.getInputStream()
      //val isr = new InputStreamReader(is)
      //val br = new BufferedReader(isr)
      //var line = ""
      //logger.info("printling lines:")
      //while (line != null) {
      //  line = br.readLine()
      //  logger.info("line=[{}]", line)
      //}

      //logger.info("process goes into waiting state")
      //logger.info("Waited for: " + process.waitFor())
      //logger.info("Program terminated!")

      /**
       * following is the scala version
       */
      val S3_DIST_CP = "s3-dist-cp"
      val INGESTED_ORC_DIR = S3Util.getSaveOrcPath()

      // listing out all the files
      //val s3DistCpCmd = S3_DIST_CP + " --src " + INGESTED_ORC_DIR + " --dest " + paramMap(Storage_Output_Path).substring(4) + "_temp --srcPattern .*\\.orc"
      //-Dmapred.child.java.opts=-Xmx1024m -Dmapreduce.job.reduces=2
      val cmd = S3_DIST_CP + " --src " + INGESTED_ORC_DIR + " --dest " + "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp --srcPattern .*\\.orc"

      //val cmd = "hdfs dfs -cp -f " + INGESTED_ORC_DIR + "/* " + "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp/"
      //val cmd = "hadoop distcp " + INGESTED_ORC_DIR + "/ s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp_2/"

      logger.info("full hdfs to s3 command : [{}]", cmd)

      // command execution
      val exitCode = (stringToProcess(cmd)).!

      logger.info("s3_dist_cp command exit code: {} and s3 copy got " + (if (exitCode == 0) "SUCCEEDED" else "FAILED"), exitCode)
    } catch {
      case ex: Exception =>
        logger.error(
          "there was an exception while copying orc file to s3 bucket. {} {}",
          "", ex.getMessage, ex)
        throw new IngestionException("s3 dist cp command failure", null, Some(StatusEnum.S3_DIST_CP_CMD_FAILED))
    }

Хотя приведенный выше код работает точно так же, как ожидалось, но есть и другое наблюдение, как показано ниже:

Вместо использования этого

val exitCode = (stringToProcess(cmd)).!

если вы используете это

val exitCode = (stringToProcess(cmd)).!!

обратите внимание на разницу в одиночном!и двойной !!, как одинокий!возвращает только код выхода, тогда как двойной!возвращает результат выполнения процесса

, поэтому в случае одиночного!Приведенный выше код просто отлично работает, а в случае double !! он тоже работает, но генерирует слишком много файлов и копий в S3-контейнере, в отличие от количества оригинальных файлов.

А что касается spark-отправьте команду, не нужно беспокоиться о параметре --driver-class-path или даже --jars, поскольку я не передавал никаких зависимостей.

1 голос
/ 01 июня 2019

Похоже, что вы сделали правильно. См. Здесь https://github.com/gorros/spark-scala-tips/blob/master/README.md

import scala.sys.process._

def s3distCp(src: String, dest: String): Unit = {
    s"s3-dist-cp --src $src --dest $dest".!
}

Пожалуйста, проверьте это примечание ... Интересно, так ли это с вами.

относительно вашего --jars /usr/lib/hadoop/client/*.jar

Вы можете добавить jar-файлы, относящиеся к s3-dist-cp, используя команду tr, например this. смотри мой ответ

--jars $(echo /dir_of_jars/*.jar | tr ' ' ',')

Примечание. Чтобы использовать этот метод, необходимо добавить приложение Hadoop и запустить Spark в клиентском или локальном режиме, поскольку s3-dist-cp недоступен на подчиненных узлах. Если вы хотите работать в режиме кластера, скопируйте команду s3-dist-cp на ведомые во время начальной загрузки.

...