Не удается распечатать результат прогноза с помощью spark-mllib из потоковых данных из-за "NotSerializableException" - PullRequest
0 голосов
/ 19 апреля 2019

Я пытаюсь проверить предсказание модели машинного обучения с помощью потоковых данных на основе kafka. Нет сообщения об ошибке до того, как я выполню команду scc.start() на spark-shell. Сообщения об ошибках появляются, если есть строка predictionDS.print. Однако, если я не напишу, например, строку predictionDS.print, то вместо строки я заменю vectorDS.print, я могу получить правильный результат.

Вот мой код и сообщение об ошибке при использовании predictionDS.print

(Blar Blar Blar ~~~)
scalar> val lrModel = LogisticRegressionModel.load(sc, "lrModel")
scalar> val stream = KafkaUtils.createDirectStream[String, String](
  | ssc,
  | PreferConsistent,
  | Subscribe[String, String](topics, kafkaParams)
  | )
scalar> val arrayDS = stream.map(record => record.value()).flatMap(_.split("[\\s]+")).map(p => p.toDouble)
scalar> val vectorDS = arrayDS.map(p => Vectors.dense(p))
scalar> val predictionDS = vectorDS.transform(rdd => lrModel.predict(rdd))
scalar> predictionDS.print
scalar> ssc.start()
19/04/19 16:30:27 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 

(repeat)

Я ожидаю 0,0 или 1,0 или 2,0 или 3,0 или 4,0 в качестве правильного результата прогноза. Но ошибка появляется сразу после команды ssc.start() и не запускает потоковую сессию.

На самом деле я изучаю учебник. Это один из примеров книги. Однако в книге показан только подход на основе приемника для интеграции спарк-кафка. Я просто делаю свой собственный код с методом проб и ошибок для прямого потокового подхода, используя мои легкие знания. Но я проверил, когда я отправляю "2.1134 4.2424 0.11 2.111" , используя kafka продуцента и использую vectorDS.print вместо predictionDS.print, я могу получить ожидаемый результат на spark-shell, как показано ниже,

-------------------------------------------
Time: 1555659325000 ms
-------------------------------------------
[2.1134]
[4.2424]
[0.11]
[2.111]

И, кроме того, я уже проверил мою модель с кодами ниже. Работает правильно.

scala> val X_test = sc.textFile("test/X_test.txt")
X_test: org.apache.spark.rdd.RDD[String] = test/X_test.txt MapPartitionsRDD[3] at textFile at <console>:27

scala> val testArrayRDD = X_test.map(_.trim).map(_.split("[\\s]+")).map{ p => p.map(q => q.toDouble)}
testArrayRDD: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[6] at map at <console>:28

scala> val testVectorRDD = testArrayRDD.map(p => Vectors.dense(p))
testVectorRDD: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[7] at map at <console>:28

scala> val lrModel = LogisticRegressionModel.load(sc, "lrModel")
lrModel: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 2805, numClasses = 6, threshold = 0.5

scala> val predictionRDD = testVectorRDD.map(p => lrModel.predict(p))
predictionRDD: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[16] at map at <console>:30

scala> predictionRDD.collect
res1: Array[Double] = Array(4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 4.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 3.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,..

...