Возможно, вы захотите использовать пользовательский оператор, который реализует интерфейс InputSelectable
, чтобы уменьшить объем необходимой буферизации. Ниже приведен пример, который реализует чередование без какой-либо буферизации, но обязательно прочитайте предостережение в документах , которое объясняет, что
... оператор может получить некоторые данные, которые он в данный момент не хочет обрабатывать ...
Другими словами, нельзя полагаться на этот простой пример, чтобы он действительно работал как есть.
public class Alternate {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Long> positive = env.generateSequence(1L, 100L);
DataStream<Long> negative = env.generateSequence(-100L, -1L);
AlternatingTwoInputStreamOperator op = new AlternatingTwoInputStreamOperator();
positive
.connect(negative)
.transform("Hack that needs buffering", Types.LONG, op)
.print();
env.execute();
}
}
class AlternatingTwoInputStreamOperator extends AbstractStreamOperator<Long>
implements TwoInputStreamOperator<Long, Long, Long>, InputSelectable {
private InputSelection nextSelection = InputSelection.FIRST;
@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
output.collect(element);
nextSelection = InputSelection.SECOND;
}
@Override
public void processElement2(StreamRecord<Long> element) throws Exception {
output.collect(element);
nextSelection = InputSelection.FIRST;
}
@Override
public InputSelection nextSelection() {
return this.nextSelection;
}
}
Примечание также, что InputSelectable
был добавлен во Flink 1.9.0.