Получение центров кластера в режиме реального времени с использованием искры StreamingKMeans - PullRequest
3 голосов
/ 15 апреля 2020

Я использую Apache Spark (pyspark - python) потоковую передачу для кластеризации моих данных. Для этого я использую StreamingKMeans из pyspark.mllib.clustering библиотеки.

Мне удалось правильно кластеризовать мои данные, но у меня проблема с доступом к центру кластера в режиме реального времени (мне нужно получить доступ к позиции кластера для каждого вновь полученного назначения данных). Моя функция всегда возвращает одинаковые позиции для кластеров (кластеры должны постоянно меняться).

#amazon_stream_data: key-value pairs, date as key and features vectors as values
#=> ('2014-01-01',[1.5,1.05,0.3])


# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=4, decayFactor=1.0).setRandomCenters(3,1.0,0)

# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(amazon_stream_data.map(lambda x:x[1]) #train on features 

result_stream = model.predictOnValues(amazon_stream_data) # predict clusters foreach feature
#result_stream : key-value pairs, date as key and cluster's index as value
#=> ('2014-01-01', 1)

joined_stream = amazon_stream.join(result_stream) 
# key-value pairs, date as key and tuple of the features and the cluster's index as value
#=> ('2014-01-01', ([1.5,1.05,0.3],1))

Я хочу создать поток расстояний, который представляет расстояние между текущей точкой и центром ее текущего кластера ( кластеры постоянно меняются).

Для этого я должен получить позицию кластера для каждой точки (вместо ее индекса). Я пытался:

desired_stream = joined_stream\
   .map(lambda x: (x[0], (x[1][0],model.latestModel().clusterCenters[x[1][1]])))
#x[0]: index (date) =>'2014-01-01'
#x[1][0]: features vector => [1.5,1.05,0.3]
#x[1][1]: cluster's index => 1

#desired_stream  supposed to be key-value pairs, 
#date as key and tuple of the features and the cluster's position vector as value
#=> ('2014-01-01', ([1.5,1.05,0.3],[1.6,0.092,0.8]))

, но model.latestModel().clusterCenters всегда возвращает одинаковые позиции для кластеров (кластеры должны постоянно меняться).

...