Итеративный поток Apache Flink не будет зацикливаться - PullRequest
0 голосов
/ 10 мая 2018

Я реализую алгоритм подключенных компонентов, используя API-интерфейс DataStream от Flink, поскольку его реализация с использованием этого API еще не реализована.
Для этого алгоритма я разделяю данные, переворачивая окна. Поэтому для каждого окна я пытаюсь вычислить алгоритм самостоятельно.

Моя проблема связана с итеративным характером алгоритма. Я реализовал конвейер данных, который я хотел для взаимодействий (пошаговый конвейер данных), который состоит из FlatMaps, 1 Join, 1 ProcessWindow и 1 Filter. Однако, похоже, что поток, который я хотел отправить по обратной связи в цикл, на самом деле не возвращается обратно в начало цикла, потому что алгоритм не повторяется. Я подозреваю, что это невозможно сделать, если исходный поток данных итерации был объединен с другим потоком (даже если последний был создан с помощью flatMap для первого).

Код, который я использую, выглядит следующим образом:

    //neigborsList = Datastream of <Vertex, [List of neighbors], label>
IterativeStream< Tuple3<Integer, ArrayList<Integer>, Integer> > beginning_loop = neigborsList.iterate(maxTimeout);

//Emits tuples Vertices and Labels for every vertex and its neighbors

DataStream<Tuple2<Integer,Integer> > labels = beginning_loop
        //Datastream of <Vertex, label> for every neigborsList.f0 and element in neigborsList.f1
        .flatMap( new EmitVertexLabel() ) 
        .keyBy(0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
        .minBy(1)               
        ;


DataStream<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> updatedVertex = beginning_loop                
            //Update vertex label with the results from the labels reduction
            .join(labels)
            .where("vertex")
            .equalTo("vertex")
            .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .apply(new JoinFunction<Tuple3<Integer,ArrayList<Integer>,Integer>, Tuple2<Integer,Integer>, Tuple4<Integer,ArrayList<Integer>,Integer,Integer>>() {

                @Override
                public Tuple4<Integer,ArrayList<Integer>,Integer,Integer> join(
                        Tuple3<Integer, ArrayList<Integer>, Integer> arg0, Tuple2<Integer, Integer> arg1)
                        throws Exception {
                    int hasConverged = 1;
                    if(arg1.f1.intValue() < arg0.f2.intValue() )
                    {
                        arg0.f2 = arg1.f1;
                        hasConverged=0;
                    }
                    return new Tuple4<>(arg0.f0,arg0.f1,arg0.f2,new Integer(hasConverged));
                }                       

            })

            //Disseminates the convergence flag if a change was made in the window
            .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .process(new ProcessAllWindowFunction<Tuple4<Integer,ArrayList<Integer>,Integer,Integer>,Tuple4<Integer, ArrayList<Integer>, Integer, Integer>,TimeWindow >() {

                @Override
                public void process(
                        ProcessAllWindowFunction<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, TimeWindow>.Context ctx,
                        Iterable<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> values,
                        Collector<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> out) throws Exception {

                    Iterator<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> iterator = values.iterator();
                    Tuple4<Integer, ArrayList<Integer>, Integer, Integer> element;

                    int hasConverged= 1;
                    while(iterator.hasNext())
                    {
                        element = iterator.next();
                        if(element.f3.intValue()>0)
                        {
                            hasConverged=0;
                            break;
                        }

                    }

                    //Re iterate and emit the values on the correct output
                    iterator = values.iterator();                           
                    Integer converged = new Integer(hasConverged);
                    while(iterator.hasNext())
                    {
                        element = iterator.next();
                        element.f3 = converged;
                        out.collect(element);

                    }                                                   
                }
            })              

            ;


DataStream<Tuple3<Integer, ArrayList<Integer>, Integer>> feed_back = updatedVertex
        .filter(new NotConvergedFilter())                               
        //Remove the finished convergence flag
        //Transforms the Tuples4 to Tuples3 so that it becomes compatible with beginning_loop
        .map(new RemoveConvergeceFlag())
        ;


beginning_loop.closeWith(feed_back);

//Selects the windows that have already converged
DataStream<?> convergedWindows = updatedVertex
        .filter(new ConvergedFilter() );


convergedWindows.print()
.setParallelism(1)
.name("Sink to stdout");

В конце выполнения конвергентная Windows не получает никакого набора (поскольку алгоритм не может сходиться только с 1 итерацией). Если я распечатываю begin_loop, я вижу исходные наборы и наборы из feed_back, полученные в результате первой итерации. Но ничего кроме этого.

Итак, обобщая мой вопрос, может ли это быть ограничением Флинка? Если да, знаете ли вы другой способ обновления меток вершин после первоначального сокращения, который не основан на соединениях?

PS. Я использую Flink 1.3.3

...