Моя цель - создать rdd с сообщениями об ошибках в файле журнала.Я читаю файл журнала и фильтрую строки, которые соответствуют слову «ОШИБКА», и мне нужно записать сообщение об ошибке в базу данных, обрамив его как СДР.
Я новичок в свече
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.text( "hdfs://10.90.3.78:9000/user/centuryuidt-3-1-1.out")
val patt: String = "ERROR"
val rdd=df.filter(line => line.contains(patt)).collect()
df.foreach(println)
и я получаю следующее исключение при выполнении этого кода.
<console>:40: error: value contains is not a member of org.apache.spark.sql.Row
val rdd=df.filter(line => line.contains(patt)).collect()
^
<console>:43: error: overloaded method value foreach with alternatives:
(func: org.apache.spark.api.java.function.ForeachFunction[org.apache.spark.sql.Row])Unit <and>
(f: org.apache.spark.sql.Row => Unit)Unit
cannot be applied to (Unit)
df.foreach(println)
^
снимок экрана:
![error screenshot](https://i.stack.imgur.com/mGVsR.png)
Добавив несколько изменений,
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val lines = sc.textFile( "hdfs://10.90.3.78:9000/user/centuryuidt-3-1-1.out")
val error = lines.filter(_.contains("ERROR"))
val df = error.toDF()
Это сработало для меня, но мне нужно создать DF со строками, это просто дало мне все строки ошибок в одной строке.Может ли кто-нибудь помочь мне разбить строки на строки .?