Как отфильтровать и преобразовать потоковые данные из Apache Flume в rdd / data freame, используя spark, чтобы записать их в таблицу - PullRequest
0 голосов
/ 19 октября 2018

Привет, я новичок в Flume / Spark / Spark Streaming.Я настроил flume и netcat и успешно передал данные в Spark.

Мое требование - проверить наличие ошибки в потоковых данных (поток данных) из файла журнала и получить строку ошибки (слово «ОШИБКА» в строке, поступившей в потоке) и сделать ее DF длязапишите его в oracle.

Я столкнулся с исключением из приведенного ниже фильтра и преобразовал его в код DF.Пожалуйста, помогите мне решить эту проблему

import org.apache.spark.streaming.flume.FlumeUtils
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.flume._
import org.apache.spark._
import org.apache.spark.streaming._
import spark.implicits._
val hostName = "10.90.3.78"
val port = 9999.toInt
val sparkStreamingContext = new StreamingContext(sc,Seconds(10))
val stream = FlumeUtils.createPollingStream(sparkStreamingContext,hostName,port)
val mappedlines = stream.map( e => new String(e.event.getBody.array()))
.filter(rec => rec.contains("ERROR"))
.map(line => line.split("ERROR"))
val arr = mappedlines.foreachRDD({status=>val DF = status.toDF()})
println(arr)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

1 Ответ

0 голосов
/ 23 октября 2018

Я решил это с помощью Foreach и преобразовал RDD в DF.Это сработало, и я успешно вставил строки ошибок в БД.

...