Различные результаты в Flink Joined Streams с Evictors - PullRequest
0 голосов
/ 08 января 2020

Я пытаюсь запустить базовое c объединение через Flink путем объединения двух DataStreams на локальном компьютере. Тип данных исходных потоков одинаков (Tuple4 (String, String, Long, Long)). После многократного запуска упомянутой ниже функции я получил два разных выходных значения в произвольном порядке (они хранятся в переменной CollectTuple2Sink ниже, журналы DEBUG для них указаны ниже). Я пытался сохранить параллелизм 1 и максимальный параллелизм 1, но проблема все еще сохраняется.

//Basic Function
    public void runBasicJoin() throws Exception {

        TumblingEventTimeWindows tsAssigner;
        //tried with getExecutionEnvironment as well
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        env.setMaxParallelism(1);
        //class declared below
        CollectTuple2Sink.VALUES.clear();

        Tuple4<String, String, Long, Long> input1 =
                new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
        Tuple4<String, String, Long, Long> input2 =
                new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
        Tuple4<String, String, Long, Long> input3 =
                new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
        Tuple4<String, String, Long, Long> input4 =
                new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
        Tuple4<String, String, Long, Long> input5 =
                new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
        Tuple4<String, String, Long, Long> input6 =
                new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
        Tuple4<String, String, Long, Long> input7 =
                new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
        Tuple4<String, String, Long, Long> input8 =
                new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
                t(0), input1, input2, input3, input4,t(5),
                input5, input6, input7, input8,t(10)
        ));

        dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
                t(0), input1, input3,input3,input4,input4,input4,t(5),
                 input5,input6, t(10),t(11)
        ));

        dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));

        dataStream1.join(dataStream2)
                .where(new Tuple4KeySelector())
                .equalTo(new Tuple4KeySelector())
                .window(tsAssigner)
                .trigger(EventTimeTrigger.create())
                .evictor(CountEvictor.of(2))
                .apply(new Tuple4JoinFunction())
                .addSink(new CollectTuple2Sink());
        env.execute();
        System.out.println(CollectTuple2Sink.VALUES);

    }

    private static class CollectTuple2Sink
            implements SinkFunction<Tuple2<String, Long>> {

        public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();

        @Override
        public synchronized void invoke(Tuple2<String, Long> value)
                throws Exception {
            VALUES.add(value);
        }

    }
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
    private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
        @Override
        public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
            return new Tuple2<>(tuple4.f1, tuple4.f3);
        }
    }
//key selector --> select the 2nd value of tuple 4
    private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
        @Override
        public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
            return tuple4.f1;
        }
    }

//source function --> generates a sequence input for tuple4
    private static class Tuple4Soruce
            implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
        private volatile boolean running = true;
        private Object[] testStream;
        private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
                TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
                });


        Tuple4Soruce(Object... eventsOrWatermarks) {
            this.testStream = eventsOrWatermarks;
        }

        @Override
        public void run(SourceContext ctx) throws Exception {
            for (int i = 0; (i < testStream.length) && running; i++) {
                if (testStream[i] instanceof Tuple4) {
                    Tuple4<String, String, Long, Long> tuple =
                            (Tuple4<String, String, Long, Long>) testStream[i];
                    ctx.collectWithTimestamp(tuple, tuple.f3);
                } else if (testStream[i] instanceof Long) {
                    Long ts = (Long) testStream[i];
                    ctx.emitWatermark(new Watermark(ts));
                } else {
                    throw new RuntimeException(testStream[i].toString());
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
            return typeInformation;
        }

    }
//util function to generate time
    public long t(int n) {
        return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
    }

Журналы для прогона 1:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]

Журналы для прогона 2:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]

Функции источника и другие определения были основаны на этом руководстве. Кроме того, было изучено несколько способов выполнения базовых c заданий с выселителями и без них из официальных документов Flink. Протестировано несколько вещей без эвакуаторов, и результат был, как и ожидалось, для всех прогонов. После того, как выселители оказались в поле зрения, все стало неопределенным c.

Flink Версия 1.4.2

1 Ответ

2 голосов
/ 09 января 2020

Вы не передали весь код, но из того, что я вижу, мое предположение относительно того, что происходит, заключается в том, что результаты зависят от порядка приема - это в случае с основанным на подсчете оконным пример - и в таких случаях вы не можете ожидать детерминированных c результатов.

Оконное соединение считывает из двух входных потоков, и хотя события будут обрабатываться в каждом потоке по порядку, два потока будут гоняться друг с другом недетерминированным c и неуправляемым способом. Результаты будут детерминированными c тогда и только тогда, когда запуск и обработка окна будут основаны исключительно на времени события. Если для этого требуется время подсчета или обработки, вы не можете ожидать получения детерминированных c результатов.

...