Я пытаюсь реализовать алгоритм кластеризации Kmeans на Flink с использованием Python API для потоковой передачи.Я делаю key_by
на основе 0-го индекса, а затем пытаюсь reduce()
для каждой группы, чтобы получить своего рода агрегат подсчета.
class CentroidAccumulator(ReduceFunction):
def reduce(self, val1, val2):
id1, point1, count1 = val1
id2, point2, count2 = val2
return (id1, point1.add(point2), count1+count2)
class Selector(KeySelector):
def getKey(self, value):
return value[0]
nearest_points = points \
.map(SelectNearestPoint(centroids)) \
.key_by(Selector()).reduce(CentroidAccumulator())
nearest_points.write_as_text("output.txt")
Ожидаемый результат:
(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)
Фактический результат:
Я получаю выходные данные всех итераций, записанных в файл (у меня есть 40 точек в образце, с которым я тестирую, и, следовательно, выходные данные имеют 40 строк, подобных этой)
(1, <kmeans_clustering.Point instance at 0x2>, 1)
(3, <kmeans_clustering.Point instance at 0x3>, 1)
(2, <kmeans_clustering.Point instance at 0x4>, 1)
(2, <kmeans_clustering.Point instance at 0x5>, 2)
.
.
.
(2, <kmeans_clustering.Point instance at 0x20>, 13)
(2, <kmeans_clustering.Point instance at 0x21>, 14)
(1, <kmeans_clustering.Point instance at 0x22>, 10)
(4, <kmeans_clustering.Point instance at 0x23>, 4)
(2, <kmeans_clustering.Point instance at 0x24>, 15)
(2, <kmeans_clustering.Point instance at 0x25>, 16)
(1, <kmeans_clustering.Point instance at 0x26>, 11)
(4, <kmeans_clustering.Point instance at 0x27>, 5)
(2, <kmeans_clustering.Point instance at 0x28>, 17)
(2, <kmeans_clustering.Point instance at 0x29>, 18)
Дело в том, что с редукцией все в порядке, но я хочу получить только последнее значение преобразования редукции для каждой группы (именно так, как редукция должна работать, насколько я понимаю).Что я делаю не так?