Pyspark StreamingKMeans.predictOn () безумный рост объема памяти - PullRequest
0 голосов
/ 26 октября 2018

Я столкнулся с проблемой при использовании StreamingKMeans.Следующий минимальный фрагмент кода загружает пакеты векторов в контекст потоковой передачи и обучает их модели StreamingKMeans.

Затем модель используется для прогнозирования принадлежности к кластеру текущей и предыдущей микропакетов, что дает линейку назначений кластеров.который может использоваться для отслеживания эволюции центроидов с течением времени.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import DenseVector
from operator import add
import numpy as np
from pyspark.mllib.clustering import StreamingKMeans

n = 1000000 # num examples
m = 500     # batch size
d = 30      # dimensionality
k = 10      # as in k-means

sc = SparkContext()
ssc = StreamingContext(sc, 1)

def get_batches():
    data = [DenseVector(v) for v in np.random.rand(n, d).tolist()]
    for i in range(0, n, m):
        yield data[i:i+m]

microbatches = ssc.queueStream(list(get_batches()))
window = microbatches.window(windowDuration=2, slideDuration=1)
model = StreamingKMeans(k=k, decayFactor=0.9).setRandomCenters(d, 1.0, 0)
model.trainOn(microbatches)
results = model.predictOn(window)
# Arbitrary action to force results to be computed
results.map(lambda x: 1).reduce(add).pprint()

ssc.start()
ssc.awaitTerminationOrTimeout(84000)

Модель работает нормально, с 500-метровым пространством Java, если за $ закомментирован $realtOn () $.Однако, когда он используется, объем памяти будет расти и расти.Он достиг 6 ГБ, прежде чем у меня кончились места в куче.

Кто-нибудь знает, что здесь происходит?

...