Реализация алгоритма потоковых K-средних в Scala - PullRequest
0 голосов
/ 28 октября 2018
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint`enter code here`
import org.apache.spark.streaming.{Seconds, StreamingContext}

sc.setLogLevel("OFF")

sc.stop

val conf = new 
SparkConf().setMaster("local[2]").setAppName("HDFSkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))

val trainData = ssc.textFileStream("hdfs://localhost:9000/user/streaming/train/").map(Vectors.parse).cache()

val test_data = ssc.textFileStream("hdfs://localhost:9000/user/streaming/test/").map(LabeledPoint.parse)

val initialCenters  = Array(Vectors.dense(50.0, 50.0), Vectors.dense(400.0,400.0), Vectors.dense(1000.0,1000.0))

val model = new StreamingKMeans().setK(3).setDecayFactor(1.0).setInitialCenters(initialCenters,Array((0.0),(0.0),(0.0)))

model.trainOn(trainData)

model.predictOnValues(test_data.map(lp => (lp.label.toInt, lp.features))).print()

ssc.start()
ssc.awaitTerminationOrTimeout(10)

Данные моего поезда и теста соответствуют приведенному ниже формату.

Данные поезда:

[1, 2]
[3, 4]
[5, 6]
[7, 8]
[9, 10]
[11, 12]
[13, 14]
[15, 16]
[17, 18]
[19, 20]
[21, 22]
[23, 24]
[25, 26]
[27, 28]
[29, 30]

Данные теста:

(1, [1, 2])
(1, [3, 4])
(1, [5, 6])
(1, [7, 8])
(2, [9, 10])
(2, [11, 12])
(2, [13, 14])
(2, [15, 16])
(3, [17, 18])
(3, [19, 20])
(3, [21, 22])
(3, [23, 24])
(1, [25, 26])
(1, [27, 28])
(1, [29, 30])

Мой поезд итестовые текстовые файлы находятся в соответствующих папках в Hadoop.

Есть ли проблема с форматом данных?или сам код неверный?Любая помощь будет оценена по этому вопросу.

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...