Я пытаюсь проверить предсказание модели машинного обучения с помощью потоковых данных на основе kafka.
Нет сообщения об ошибке до того, как я выполню команду scc.start()
на spark-shell.
Сообщения об ошибках появляются, если есть строка predictionDS.print
.
Однако, если я не напишу, например, строку predictionDS.print
, то вместо строки я заменю vectorDS.print
, я могу получить правильный результат.
Вот мой код и сообщение об ошибке при использовании predictionDS.print
(Blar Blar Blar ~~~)
scalar> val lrModel = LogisticRegressionModel.load(sc, "lrModel")
scalar> val stream = KafkaUtils.createDirectStream[String, String](
| ssc,
| PreferConsistent,
| Subscribe[String, String](topics, kafkaParams)
| )
scalar> val arrayDS = stream.map(record => record.value()).flatMap(_.split("[\\s]+")).map(p => p.toDouble)
scalar> val vectorDS = arrayDS.map(p => Vectors.dense(p))
scalar> val predictionDS = vectorDS.transform(rdd => lrModel.predict(rdd))
scalar> predictionDS.print
scalar> ssc.start()
19/04/19 16:30:27 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
(repeat)
Я ожидаю 0,0 или 1,0 или 2,0 или 3,0 или 4,0 в качестве правильного результата прогноза.
Но ошибка появляется сразу после команды ssc.start()
и не запускает потоковую сессию.
На самом деле я изучаю учебник. Это один из примеров книги. Однако в книге показан только подход на основе приемника для интеграции спарк-кафка. Я просто делаю свой собственный код с методом проб и ошибок для прямого потокового подхода, используя мои легкие знания. Но я проверил, когда я отправляю "2.1134 4.2424 0.11 2.111" , используя kafka продуцента и использую vectorDS.print
вместо predictionDS.print
, я могу получить ожидаемый результат на spark-shell, как показано ниже,
-------------------------------------------
Time: 1555659325000 ms
-------------------------------------------
[2.1134]
[4.2424]
[0.11]
[2.111]
И, кроме того, я уже проверил мою модель с кодами ниже. Работает правильно.
scala> val X_test = sc.textFile("test/X_test.txt")
X_test: org.apache.spark.rdd.RDD[String] = test/X_test.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> val testArrayRDD = X_test.map(_.trim).map(_.split("[\\s]+")).map{ p => p.map(q => q.toDouble)}
testArrayRDD: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[6] at map at <console>:28
scala> val testVectorRDD = testArrayRDD.map(p => Vectors.dense(p))
testVectorRDD: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[7] at map at <console>:28
scala> val lrModel = LogisticRegressionModel.load(sc, "lrModel")
lrModel: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 2805, numClasses = 6, threshold = 0.5
scala> val predictionRDD = testVectorRDD.map(p => lrModel.predict(p))
predictionRDD: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[16] at map at <console>:30
scala> predictionRDD.collect
res1: Array[Double] = Array(4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 4.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 3.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,..