Я хочу получить данные из потока nifi, чтобы зажечь их и сделать кое-что. После этого я хочу снова отправить результат в поток nifi.
Это мой поток nifi для отправки данных на искру с использованием выходных портов.
Чтобы получить данные из потока Nifi, я написал следующую функцию:
def process() ={
val schema =
StructType(
Seq(
StructField(name = "ID", dataType = StringType, nullable = false),
StructField(name = "Client_Name", dataType = StringType, nullable = false),
StructField(name = "Due_Date", dataType = StringType, nullable = false),
StructField(name = "Score", dataType = StringType, nullable = false)
)
)
val config =
new SiteToSiteClient
.Builder()
.url("http://localhost:8090/nifi")
.portName("Data For Spark")
.buildConfig()
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("NiFi-Spark Streaming example")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY))
val file = packetStream.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
file.foreachRDD(rdd => {
val data = spark.createDataFrame(rdd
.filter(!_.contains("ID,Client_Name,Due_Date,Score"))
.map(line => Row.fromSeq(line.split(",").toSeq)), schema)
data.show(100)
val id = data.select("ID")
})
ssc.start()
ssc.awaitTermination()
}
Окончательный результат вышеприведенной функции - id
dataframe. Я хочу отправить этот результат в поток nifi. Я не хочу записывать этот результат в виде файла в какой-либо пункт назначения и получать поток nifi с помощью процессора getFile
.
Как отправить окончательный результат в поток nifi?