Ошибка в фильтрации строк, которые соответствуют слову в файле журнала с использованием искры - PullRequest
0 голосов
/ 12 октября 2018

Моя цель - создать rdd с сообщениями об ошибках в файле журнала.Я читаю файл журнала и фильтрую строки, которые соответствуют слову «ОШИБКА», и мне нужно записать сообщение об ошибке в базу данных, обрамив его как СДР.

Я новичок в свече

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.text( "hdfs://10.90.3.78:9000/user/centuryuidt-3-1-1.out")
val patt: String = "ERROR"
val rdd=df.filter(line => line.contains(patt)).collect()
df.foreach(println)

и я получаю следующее исключение при выполнении этого кода.

<console>:40: error: value contains is not a member of org.apache.spark.sql.Row
       val rdd=df.filter(line => line.contains(patt)).collect()
                                      ^
<console>:43: error: overloaded method value foreach with alternatives:
  (func: org.apache.spark.api.java.function.ForeachFunction[org.apache.spark.sql.Row])Unit <and>
  (f: org.apache.spark.sql.Row => Unit)Unit
 cannot be applied to (Unit)
       df.foreach(println)
          ^

снимок экрана:

error screenshot

Добавив несколько изменений,

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val lines = sc.textFile( "hdfs://10.90.3.78:9000/user/centuryuidt-3-1-1.out")
val error = lines.filter(_.contains("ERROR"))
val df = error.toDF()

Это сработало для меня, но мне нужно создать DF со строками, это просто дало мне все строки ошибок в одной строке.Может ли кто-нибудь помочь мне разбить строки на строки .?

1 Ответ

0 голосов
/ 12 октября 2018

Вот мой полный пример:

scala> val df = spark.read.text("errors.txt")
df: org.apache.spark.sql.DataFrame = [value: string]

scala> df.show
+------+
| value|
+------+
|line 1|
| ERROR|
|line 2|
| ERROR|
+------+


scala> val errors = df.filter(row => row.getString(0).contains("ERROR"))
errors: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: string]

scala> errors.collect.foreach(println)
[ERROR]
[ERROR]

Если вам действительно нужны ошибки в качестве СДР - обратите внимание, что это СДР [Строка]:

scala> errors.rdd
res7: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[13] at rdd at <console>:34

Если вам действительно нужны ошибкикак СДР [Строка]:

scala> errors.map(_.getString(0)).rdd
res9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at rdd at <console>:34
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...