Flink Streaming Python API - Reduce () дает инкрементные результаты вместо конечного значения - PullRequest
0 голосов
/ 13 мая 2019

Я пытаюсь реализовать алгоритм кластеризации Kmeans на Flink с использованием Python API для потоковой передачи.Я делаю key_by на основе 0-го индекса, а затем пытаюсь reduce() для каждой группы, чтобы получить своего рода агрегат подсчета.

class CentroidAccumulator(ReduceFunction):                                                                                                                                       
    def reduce(self, val1, val2):                                                                                                                                                
        id1, point1, count1 =  val1                                                                                                                                              
        id2, point2, count2 =  val2                                                                                                                                              
        return (id1, point1.add(point2), count1+count2)   

class Selector(KeySelector):                                                                                                                                                     
    def getKey(self, value):                                                                                                                                                     
        return value[0]   


nearest_points = points \                                                                                                                                                
                .map(SelectNearestPoint(centroids)) \                                                                                                                            
                .key_by(Selector()).reduce(CentroidAccumulator()) 
nearest_points.write_as_text("output.txt")

Ожидаемый результат:

(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)

Фактический результат:

Я получаю выходные данные всех итераций, записанных в файл (у меня есть 40 точек в образце, с которым я тестирую, и, следовательно, выходные данные имеют 40 строк, подобных этой)

(1, <kmeans_clustering.Point instance at 0x2>, 1)                                                                                                                                
(3, <kmeans_clustering.Point instance at 0x3>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x4>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x5>, 2)                                                                                                                                
.
.
.                                                                                                                
(2, <kmeans_clustering.Point instance at 0x20>, 13)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x21>, 14)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x22>, 10)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x23>, 4)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x24>, 15)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x25>, 16)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x26>, 11)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x27>, 5)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x28>, 17)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x29>, 18) 

Дело в том, что с редукцией все в порядке, но я хочу получить только последнее значение преобразования редукции для каждой группы (именно так, как редукция должна работать, насколько я понимаю).Что я делаю не так?

1 Ответ

1 голос
/ 13 мая 2019

Вы не делаете ничего плохого; это ожидаемое поведение для функции сокращения потоковой передачи. Концептуально поток данных - это бесконечный поток данных, и поэтому не имеет смысла «ждать до конца», чтобы получить результат. Стандартное поведение потоковых программ - получение результата для каждого события.

Конечно, это может быть немного неудобно. Если вы хотите увидеть только конечный результат, то должен быть какой-то способ указать, что конец настал. С пакетными программами это происходит естественно. В потоковых приложениях конечные источники данных отправляют водяной знак со значением MAX_WATERMARK, который можно использовать для определения того, что вход достиг своего конца - вы можете отловить это в ProcessFunction с таймером времени-события, но это довольно сложное решение. Вы также можете использовать Windows для реализации своего рода обходного пути.

...