Я использую Apache Spark (pyspark - python) потоковую передачу для кластеризации моих данных. Для этого я использую StreamingKMeans
из pyspark.mllib.clustering
библиотеки.
Мне удалось правильно кластеризовать мои данные, но у меня проблема с доступом к центру кластера в режиме реального времени (мне нужно получить доступ к позиции кластера для каждого вновь полученного назначения данных). Моя функция всегда возвращает одинаковые позиции для кластеров (кластеры должны постоянно меняться).
#amazon_stream_data: key-value pairs, date as key and features vectors as values
#=> ('2014-01-01',[1.5,1.05,0.3])
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=4, decayFactor=1.0).setRandomCenters(3,1.0,0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(amazon_stream_data.map(lambda x:x[1]) #train on features
result_stream = model.predictOnValues(amazon_stream_data) # predict clusters foreach feature
#result_stream : key-value pairs, date as key and cluster's index as value
#=> ('2014-01-01', 1)
joined_stream = amazon_stream.join(result_stream)
# key-value pairs, date as key and tuple of the features and the cluster's index as value
#=> ('2014-01-01', ([1.5,1.05,0.3],1))
Я хочу создать поток расстояний, который представляет расстояние между текущей точкой и центром ее текущего кластера ( кластеры постоянно меняются).
Для этого я должен получить позицию кластера для каждой точки (вместо ее индекса). Я пытался:
desired_stream = joined_stream\
.map(lambda x: (x[0], (x[1][0],model.latestModel().clusterCenters[x[1][1]])))
#x[0]: index (date) =>'2014-01-01'
#x[1][0]: features vector => [1.5,1.05,0.3]
#x[1][1]: cluster's index => 1
#desired_stream supposed to be key-value pairs,
#date as key and tuple of the features and the cluster's position vector as value
#=> ('2014-01-01', ([1.5,1.05,0.3],[1.6,0.092,0.8]))
, но model.latestModel().clusterCenters
всегда возвращает одинаковые позиции для кластеров (кластеры должны постоянно меняться).