Интеграция Apache Spark и Nifi - PullRequest
       21

Интеграция Apache Spark и Nifi

0 голосов
/ 31 октября 2018

Я хочу отправить потоковый файл Nifi в Spark, выполнить некоторые преобразования в Spark и снова отправить результат обратно в Nifi, чтобы я мог продолжить работу в Nifi. Я не хочу записывать потоковый файл, записанный в базу данных или HDFS, а затем запускать задание Spark. Я хочу отправить потоковый файл непосредственно в Spark и получить результат напрямую из Spark в Nifi. Я пытался использовать ExecuteSparkInteractive процессор в Nifi, но я застрял. Любые примеры будут полезны

Ответы [ 2 ]

0 голосов
/ 22 июня 2019

Это может помочь:

Вы можете сделать все в Nifi, выполнив следующие действия: -

  1. Используйте ListSFTP для вывода списка файлов из места приземления.
  2. Используйте процессор UpdateAttribute и назначьте абсолютный путь к файлу для переменной. Используйте эту переменную в вашем коде искры в качестве процессора на следующем шаге поддержки языка выражений.
  3. Используйте процессор ExecuteSparkInteractive, здесь вы можете написать искровой код (используя python или scala или Java), и вы можете прочитать ваш входной файл из места приземления (используйте переменную абсолютного пути из шага 2), не передавая его как файл потока Nifi и выполнить операцию / преобразование этого файла (используйте spark.read ... для чтения файла в rdd). Вы можете откорректировать свой вывод либо для внешней таблицы hive, либо для временного местоположения hdfs.
  4. используйте процессор FetchSFTP для чтения файла из временного местоположения hdfs и продолжения дальнейших операций с Nifi.

Здесь, вам нужна установка Livy для запуска спарк-кода из Nifi (через ExecuteSparkINteractive). Вы можете посмотреть, как настроить службы контроллера Livy и nifi, необходимые для использования livy в Nifi.

Удачи !!

0 голосов
/ 31 октября 2018

Вы не можете отправлять данные напрямую в Spark, если это не потоковое воспроизведение. Если это традиционный Spark с пакетным выполнением, то Spark необходимо прочитать данные из некоторого типа хранилища, такого как HDFS. Цель ExecuteSparkInteractive - запустить задание Spark для запуска данных, доставленных в HDFS.

Если вы хотите перейти к потоковому маршруту, есть два варианта ...

1) Непосредственная интеграция NiFi с потоковой передачей Spark

https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark

2) Используйте Kafka для интеграции NiFi и Spark

NiFi пишет в тему Кафки, Spark читает из темы Кафки, Spark пишет обратно в тему Кафки, NiFi читает из темы Кафки. Этот подход, вероятно, будет лучшим вариантом.

...