Я столкнулся с проблемой при использовании StreamingKMeans.Следующий минимальный фрагмент кода загружает пакеты векторов в контекст потоковой передачи и обучает их модели StreamingKMeans.
Затем модель используется для прогнозирования принадлежности к кластеру текущей и предыдущей микропакетов, что дает линейку назначений кластеров.который может использоваться для отслеживания эволюции центроидов с течением времени.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import DenseVector
from operator import add
import numpy as np
from pyspark.mllib.clustering import StreamingKMeans
n = 1000000 # num examples
m = 500 # batch size
d = 30 # dimensionality
k = 10 # as in k-means
sc = SparkContext()
ssc = StreamingContext(sc, 1)
def get_batches():
data = [DenseVector(v) for v in np.random.rand(n, d).tolist()]
for i in range(0, n, m):
yield data[i:i+m]
microbatches = ssc.queueStream(list(get_batches()))
window = microbatches.window(windowDuration=2, slideDuration=1)
model = StreamingKMeans(k=k, decayFactor=0.9).setRandomCenters(d, 1.0, 0)
model.trainOn(microbatches)
results = model.predictOn(window)
# Arbitrary action to force results to be computed
results.map(lambda x: 1).reduce(add).pprint()
ssc.start()
ssc.awaitTerminationOrTimeout(84000)
Модель работает нормально, с 500-метровым пространством Java, если за $ закомментирован $realtOn () $.Однако, когда он используется, объем памяти будет расти и расти.Он достиг 6 ГБ, прежде чем у меня кончились места в куче.
Кто-нибудь знает, что здесь происходит?