Apache Flink - заказ ConnectedStream и противодавление - PullRequest
0 голосов
/ 08 января 2020

В том же коде ниже показаны две функции источника - одна, которая генерирует четные числа от 0 до 20, и другая, которая генерирует нечетные числа из формы 1-20, соединяются вместе, чтобы вывести объединение всех обоих потоков и распечатать их.

Пример кода:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

Выход

1578465207355:SourceB: 1
1578465207379:SourceA: 0
1578465207437:OperatorA: 0
1578465208360:SourceB: 3
1578465208380:SourceA: 2
1578465209364:SourceB: 5
1578465209383:SourceA: 4
1578465210366:SourceB: 7
1578465210386:SourceA: 6
1578465211369:SourceB: 9
1578465211390:SourceA: 8
1578465212370:SourceB: 11
1578465212394:SourceA: 10
1578465212440:Sink: 0
1578465212441:OperatorB: 1
1578465213375:SourceB: 13
1578465213399:SourceA: 12
1578465214379:SourceB: 15
1578465214401:SourceA: 14
1578465215383:SourceB: 17
1578465215406:SourceA: 16
1578465216388:SourceB: 19
1578465216409:SourceA: 18
1578465217441:Sink: 1
1578465217441:OperatorB: 3
1578465222446:Sink: 3
1578465222446:OperatorB: 5
1578465227448:Sink: 5
1578465227449:OperatorB: 7
1578465232452:Sink: 7
1578465232453:OperatorB: 9
1578465237453:Sink: 9
1578465237453:OperatorB: 11
1578465242456:Sink: 11
1578465242456:OperatorA: 2
1578465247462:Sink: 2
1578465247462:OperatorA: 4
1578465252467:Sink: 4
1578465252467:OperatorA: 6

Q1.

Предполагается, что Flink отправит любой элемент, который прибудет первым в любом из подключенных потоков к CoProcessFunction. Однако здесь мы видим, что число «2» создается функцией источника раньше, чем число «11», а число «11» отправляется функции CoProcessFunction до «2». Почему это так?

Q2.

В подключенном потоке обратного давления не происходит. Функции источника работают до конца, даже если они все еще обрабатываются оператором (смоделирован Thread.sleep в приведенном выше коде). Есть ли способ реализовать противодавление с подключенным потоком?

Редактирование кода V2

        Configuration config = new Configuration();
    config.setInteger("taskmanager.network.numberOfBuffers", 4);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

Выход

1578605461497:SourceB: 7279
1578605461497:SourceB: 7281
1578605466406:Sink: 1
1578605466406:OperatorB: 3 <---- only odd numbers (input B) in the output
1578605471411:Sink: 3
1578605471411:OperatorB: 5
1578605476414:Sink: 5
1578605476415:OperatorB: 7
1578605481415:Sink: 7
1578605481415:OperatorB: 9
1578605486417:Sink: 9
1578605486417:OperatorB: 11
1578605491422:Sink: 11
1578605491422:OperatorB: 13
1578605496427:Sink: 13
1578605496427:OperatorB: 15
1578605501432:Sink: 15
1578605501432:OperatorB: 17
1578605506434:Sink: 17
1578605506434:OperatorB: 19
1578605511435:Sink: 19
1578605511435:OperatorB: 21
1578605516435:Sink: 21
1578605516436:OperatorB: 23
1578605521436:Sink: 23
1578605521436:OperatorB: 25
1578605526440:Sink: 25
1578605526440:OperatorB: 27
1578605531443:Sink: 27
1578605531443:OperatorB: 29
1578605536447:Sink: 29
1578605536447:OperatorB: 31
1578605541452:Sink: 31
1578605541452:OperatorB: 33
1578605546457:Sink: 33
1578605546457:OperatorB: 35
1578605551457:Sink: 35
1578605551457:OperatorB: 37
1578605556460:Sink: 37
1578605556460:OperatorB: 39
1578605561518:Sink: 39
1578605561519:OperatorB: 41
1578605566536:Sink: 41
1578605566536:OperatorB: 43
1578605571547:Sink: 43
1578605571547:OperatorB: 45
1578605576554:Sink: 45
1578605576554:OperatorB: 47
1578605581561:Sink: 47
1578605581562:OperatorB: 49
1578605586568:Sink: 49
1578605586568:OperatorB: 51
1578605591576:Sink: 51
1578605591576:OperatorB: 53
1578605596580:Sink: 53
1578605596580:OperatorB: 55
1578605601586:Sink: 55
1578605601587:OperatorB: 57
1578605606592:Sink: 57
1578605606592:OperatorB: 59
1578605611596:Sink: 59
1578605611596:OperatorB: 61
1578605616602:Sink: 61
1578605616602:OperatorB: 63
1578605621606:Sink: 63
1578605621606:OperatorB: 65
1578605626608:Sink: 65
1578605626608:OperatorB: 67
1578605631613:Sink: 67
1578605631613:OperatorB: 69
1578605636618:Sink: 69
1578605636618:OperatorB: 71

1 Ответ

3 голосов
/ 08 января 2020

Q1

Важно понимать, что гарантии упорядоченности применяются только к каналам. Эта свобода позволяет операторам с двумя входами активно выбирать, какой вход использовать. Подумайте о объединении ha sh, которое сначала полностью использует одну сторону для построения таблицы ha sh, а затем передает поток второй стороной для проверки таблицы.

В частности, для вас это означает что у вас нет никаких гарантий упорядоченности между двумя подключенными каналами, поскольку они все еще логически и физически разделены.

Есть ли у вас случай использования, когда вам требуется порядок на обоих входах?

Q2.

Вы не можете наблюдать обратное давление, потому что у вас слишком мало данных. На любом сетевом канале у вас есть буферы на стороне отправителя и получателя. Поэтому до тех пор, пока вы не насытите оба, вы не увидите никакого обратного давления.

edit: относительно вашего первого комментария

Q1 CoGroupProcessor будет чередовать входы на основе наилучшего усилия точно, чтобы избежать голодания входа , Однако, когда один из входов простаивает, он будет читать только с другого входа. После того, как вход снова становится занятым, может потребоваться некоторое время (<1 мс) для повторного захвата потока. </p>

Q2 Я изменил ваш код и уменьшил количество сетевых буферов до 10 и удалил спящие из ваших входов и получил следующий вывод, который показывает противодавление.

1578560715990:SourceA: 0
1578560715990:SourceB: 1
...
1578560716041:OperatorA: 0 <-- blocks coprocessfunction
...
1578560716280:SourceB: 29127 <-- at this point network buffers are full
1578560721030:Sink: 0 <-- slow processing in coprocess function, no more inputs are generated because of backpressure
1578560721030:OperatorB: 1
1578560726034:Sink: 1
1578560726034:OperatorA: 2 <-- clear alternation between inputs
1578560731038:Sink: 2
1578560731039:OperatorB: 3
1578560736043:Sink: 3
1578560736043:OperatorA: 4
1578560741047:Sink: 4
1578560741047:OperatorB: 5
1578560746051:Sink: 5
...
...