В том же коде ниже показаны две функции источника - одна, которая генерирует четные числа от 0 до 20, и другая, которая генерирует нечетные числа из формы 1-20, соединяются вместе, чтобы вывести объединение всех обоих потоков и распечатать их.
Пример кода:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i=0; i < 20; i=i+2) {
System.out.println(System.currentTimeMillis()+":SourceA: " + i);
ctx.collect(i);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
})
.connect(env.addSource(new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i=1; i < 20; i=i+2) {
System.out.println(System.currentTimeMillis()+":SourceB: " + i);
ctx.collect(i);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
})).process(new CoProcessFunction<Integer, Integer, Integer>(){
private static final long serialVersionUID = 1L;
@Override
public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
Thread.sleep(5000);
arg2.collect(arg0);
}
@Override
public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
Thread.sleep(5000);
arg2.collect(arg0);
}
})
.addSink(new SinkFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Integer value) throws Exception {
System.out.println(System.currentTimeMillis()+":Sink: " + value);
}
});
env.execute();
Выход
1578465207355:SourceB: 1
1578465207379:SourceA: 0
1578465207437:OperatorA: 0
1578465208360:SourceB: 3
1578465208380:SourceA: 2
1578465209364:SourceB: 5
1578465209383:SourceA: 4
1578465210366:SourceB: 7
1578465210386:SourceA: 6
1578465211369:SourceB: 9
1578465211390:SourceA: 8
1578465212370:SourceB: 11
1578465212394:SourceA: 10
1578465212440:Sink: 0
1578465212441:OperatorB: 1
1578465213375:SourceB: 13
1578465213399:SourceA: 12
1578465214379:SourceB: 15
1578465214401:SourceA: 14
1578465215383:SourceB: 17
1578465215406:SourceA: 16
1578465216388:SourceB: 19
1578465216409:SourceA: 18
1578465217441:Sink: 1
1578465217441:OperatorB: 3
1578465222446:Sink: 3
1578465222446:OperatorB: 5
1578465227448:Sink: 5
1578465227449:OperatorB: 7
1578465232452:Sink: 7
1578465232453:OperatorB: 9
1578465237453:Sink: 9
1578465237453:OperatorB: 11
1578465242456:Sink: 11
1578465242456:OperatorA: 2
1578465247462:Sink: 2
1578465247462:OperatorA: 4
1578465252467:Sink: 4
1578465252467:OperatorA: 6
Q1.
Предполагается, что Flink отправит любой элемент, который прибудет первым в любом из подключенных потоков к CoProcessFunction. Однако здесь мы видим, что число «2» создается функцией источника раньше, чем число «11», а число «11» отправляется функции CoProcessFunction до «2». Почему это так?
Q2.
В подключенном потоке обратного давления не происходит. Функции источника работают до конца, даже если они все еще обрабатываются оператором (смоделирован Thread.sleep в приведенном выше коде). Есть ли способ реализовать противодавление с подключенным потоком?
Редактирование кода V2
Configuration config = new Configuration();
config.setInteger("taskmanager.network.numberOfBuffers", 4);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
env.setParallelism(1);
env
.addSource(new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i=0; i < 50000; i=i+2) {
System.out.println(System.currentTimeMillis()+":SourceA: " + i);
ctx.collect(i);
}
}
@Override
public void cancel() {}
})
.connect(env.addSource(new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i=1; i < 50000; i=i+2) {
System.out.println(System.currentTimeMillis()+":SourceB: " + i);
ctx.collect(i);
}
}
@Override
public void cancel() {}
})).process(new CoProcessFunction<Integer, Integer, Integer>(){
private static final long serialVersionUID = 1L;
@Override
public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
Thread.sleep(5000);
arg2.collect(arg0);
}
@Override
public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
Thread.sleep(5000);
arg2.collect(arg0);
}
})
.addSink(new SinkFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Integer value) throws Exception {
System.out.println(System.currentTimeMillis()+":Sink: " + value);
}
});
env.execute();
Выход
1578605461497:SourceB: 7279
1578605461497:SourceB: 7281
1578605466406:Sink: 1
1578605466406:OperatorB: 3 <---- only odd numbers (input B) in the output
1578605471411:Sink: 3
1578605471411:OperatorB: 5
1578605476414:Sink: 5
1578605476415:OperatorB: 7
1578605481415:Sink: 7
1578605481415:OperatorB: 9
1578605486417:Sink: 9
1578605486417:OperatorB: 11
1578605491422:Sink: 11
1578605491422:OperatorB: 13
1578605496427:Sink: 13
1578605496427:OperatorB: 15
1578605501432:Sink: 15
1578605501432:OperatorB: 17
1578605506434:Sink: 17
1578605506434:OperatorB: 19
1578605511435:Sink: 19
1578605511435:OperatorB: 21
1578605516435:Sink: 21
1578605516436:OperatorB: 23
1578605521436:Sink: 23
1578605521436:OperatorB: 25
1578605526440:Sink: 25
1578605526440:OperatorB: 27
1578605531443:Sink: 27
1578605531443:OperatorB: 29
1578605536447:Sink: 29
1578605536447:OperatorB: 31
1578605541452:Sink: 31
1578605541452:OperatorB: 33
1578605546457:Sink: 33
1578605546457:OperatorB: 35
1578605551457:Sink: 35
1578605551457:OperatorB: 37
1578605556460:Sink: 37
1578605556460:OperatorB: 39
1578605561518:Sink: 39
1578605561519:OperatorB: 41
1578605566536:Sink: 41
1578605566536:OperatorB: 43
1578605571547:Sink: 43
1578605571547:OperatorB: 45
1578605576554:Sink: 45
1578605576554:OperatorB: 47
1578605581561:Sink: 47
1578605581562:OperatorB: 49
1578605586568:Sink: 49
1578605586568:OperatorB: 51
1578605591576:Sink: 51
1578605591576:OperatorB: 53
1578605596580:Sink: 53
1578605596580:OperatorB: 55
1578605601586:Sink: 55
1578605601587:OperatorB: 57
1578605606592:Sink: 57
1578605606592:OperatorB: 59
1578605611596:Sink: 59
1578605611596:OperatorB: 61
1578605616602:Sink: 61
1578605616602:OperatorB: 63
1578605621606:Sink: 63
1578605621606:OperatorB: 65
1578605626608:Sink: 65
1578605626608:OperatorB: 67
1578605631613:Sink: 67
1578605631613:OperatorB: 69
1578605636618:Sink: 69
1578605636618:OperatorB: 71