Вопрос об операторе rdd.pipe () на Apache Spark - PullRequest
0 голосов
/ 17 января 2019

Я пытаюсь запустить внешний скрипт c ++ на Apache Spark с помощью rdd.pipe (). Не могу найти достаточно информации в документации, поэтому спрашиваю здесь.

Должен ли внешний сценарий быть доступен на всех узлах кластера при использовании rdd.pipe ()?

Что если у меня нет разрешения на установку чего-либо на узлы кластера? Есть ли другой способ сделать скрипт доступным для рабочих узлов?

Ответы [ 2 ]

0 голосов
/ 20 января 2019

Кажется, в конце концов, что внешний скрипт должен присутствовать на всех узлах исполнителя. Один из способов сделать это - передать ваш скрипт через spark-submit (например, --files script.sh), и тогда вы сможете ссылаться на него (например, "./script.sh") в rdd.pipe.

0 голосов
/ 17 января 2019

Apache Spark, есть специальный Rdd, pipedRdd, который обеспечивает вызовы внешних программ, таких как программы C ++ на основе CUDA, для ускорения вычислений.

Я добавляю небольшой пример, чтобы объяснить здесь.

Сценарий оболочки: test.sh

#!/bin/sh
echo "Running shell script"
while read LINE; do
   echo ${LINE}!
done

Передача данных rdd в сценарий оболочки

val scriptPath = "/home/hadoop/test.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()

Теперь создайте программу scala для вызова этой трубы RDD

val proc = Runtime.getRuntime.exec(Array(command))

 new Thread("stderr reader for " + command) {
      override def run() {
        for(line <- Source.fromInputStream(proc.getErrorStream).getLines)
          System.err.println(line)
      }
    }.start()

val lineList = List("hello","how","are","you")
  new Thread("stdin writer for " + command) {
      override def run() {
        val out = new PrintWriter(proc.getOutputStream)
        for(elem <- lineList)
          out.println(elem)
        out.close()
      }
    }.start()

Spark RDD

val data = sc.parallelize(List("hi","hello","how","are","you"))
val scriptPath = "/root/echo.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()

Результаты:

Array[String] = Array(Running shell script, hi!, Running shell script, hello!, 
 Running shell script, how!, Running shell script, are!, you!)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...