как читать файлы из GetFilesProcessor в NiFi - PullRequest
0 голосов
/ 10 апреля 2019

Ниже приведен мой поток:

GetFile > ExecuteSparkInteractive > PutFile

Я хочу прочитать файлы с GetFile процессора на ExecuteSparkInteractive процессоре, применить некоторые преобразования и поместить его в определенное место.Ниже мой поток enter image description here

Я написал spark scala code в секции code искрового процессора:

val sc1=sc.textFile("local_path")
sc1.foreach(println)

В потоке ничего не происходит.Итак, как я могу читать файлы в процессоре спарк, используя процессор GetFile.

2-я часть:
Я попробовал нижеследующий поток только для практики:

ExecuteScript > PutFile > LogMessage

, и я упомянул ниже код в процессоре исполняемого сценария:

readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
    lines = line.strip()
    finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)  

Код работает нормально, но он не записывает отформатированные данные в папку назначения.Так, где я иду здесь не так?Кроме того, я установил панд на локальный компьютер и запустил код панд из процессора исполняемого сценария, но nifi не читает модуль панд.Почему это так ?Я старался изо всех сил.Кроме того, я не мог найти соответствующие ссылки для этого, где я могу получить основной поток

1 Ответ

1 голос
/ 10 апреля 2019

Это не совсем так ... GetFile собирает файлы, локальные для узла NiFi, и передает их в поток NiFi для обработки.ExecuteSparkInteractive запускает искровое задание на удаленном кластере Spark, но не передает данные в Spark.Таким образом, вы, вероятно, захотите поместить данные в то место, к которому Spark может получить доступ, возможно, GetFile -> PutHDFS -> ExecuteSparkInteractive.

...