Pu sh Данные в Nifi Flow с использованием apache spark и scala - PullRequest
1 голос
/ 09 апреля 2020

Я хочу получить данные из потока nifi, чтобы зажечь их и сделать кое-что. После этого я хочу снова отправить результат в поток nifi.

Это мой поток nifi для отправки данных на искру с использованием выходных портов.

enter image description here

Чтобы получить данные из потока Nifi, я написал следующую функцию:

def process() ={

    val schema =
      StructType(
        Seq(
          StructField(name = "ID", dataType = StringType, nullable = false),
          StructField(name = "Client_Name", dataType = StringType, nullable = false),
          StructField(name = "Due_Date", dataType = StringType, nullable = false),
          StructField(name = "Score", dataType = StringType, nullable = false)
        )
      )

    val config =
      new SiteToSiteClient
      .Builder()
      .url("http://localhost:8090/nifi")
      .portName("Data For Spark")
        .buildConfig()


    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("NiFi-Spark Streaming example")

    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    val packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY))

    val file = packetStream.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))


    file.foreachRDD(rdd => {

      val data = spark.createDataFrame(rdd
                                              .filter(!_.contains("ID,Client_Name,Due_Date,Score"))
                                              .map(line => Row.fromSeq(line.split(",").toSeq)), schema)

      data.show(100)
      val id = data.select("ID")

    })

    ssc.start()
    ssc.awaitTermination()



  }

Окончательный результат вышеприведенной функции - id dataframe. Я хочу отправить этот результат в поток nifi. Я не хочу записывать этот результат в виде файла в какой-либо пункт назначения и получать поток nifi с помощью процессора getFile.

Как отправить окончательный результат в поток nifi?

Ответы [ 2 ]

0 голосов
/ 09 апреля 2020

Посмотрите на ListenHTTP . Таким образом, вы будете относиться к NiFi как к простой REST-услуге. Лично я предпочел бы некоторую шину сообщений, связанную между Spark и NiFi, но если это невозможно для вашего варианта использования, тогда вы можете попробовать, если это работает для вас.

0 голосов
/ 09 апреля 2020

Это интересный подход.

Рассматривали ли вы введение брокерских услуг, таких как Apache Kafka? Это может быть использовано как в качестве источника, так и в качестве приемника в вашем Apache Spark-приложении, и интеграция производится из коробки. Вы также можете следовать официальному руководству здесь: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. В руководстве описывается поток с использованием сравнительно нового Apache Spark Structured Streaming.

Затем на Apache NiFi вы можете использовать процессор ConsumeKafkaRecord для потребления из того же топика c, который используется в качестве приемника в вашем Apache Искровое приложение. Вы также можете использовать процессор PublishKafkaRecord, если вы используете sh для рефакторинга вашего приложения, чтобы использовать Apache Kafka в качестве источника, а не полагаться на Apache сокеты NiFi напрямую.

Обновление : Если вам абсолютно необходимо писать напрямую в Apache NiFi, используя Apache Spark Structured Streaming, вы можете расширить класс ForeachWriter (https://spark.apache.org/docs/latest/api/scala/index.html#org. apache .spark. sql .ForeachWriter ) реализовать свой собственный приемник.

...