Следующая программа печатает «1» в течение нескольких секунд, а затем зависает.s1
и s2
имеют параллелизм 4. В конечном счете, мой вопрос состоит в том, как заставить следующий код продолжать печатать «1» бесконечно, не меняя код в операторах и источнике?
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStreamSource<Integer> numbers = env.addSource(new SourceFunction<Integer>() {
@Override
public synchronized void run(SourceContext<Integer> sourceContext) {
while (true) sourceContext.collect(0);
}
@Override
public void cancel() { }
});
DataStream<Integer> s1 = numbers.map(d -> d + 1);
DataStream<Integer> s2 = numbers.map(d -> {
while (true) Thread.yield();
});
s1.print();
s2.print();
env.execute();
}
Может бытьЯ неправильно понял всю историю Flink, но я не вижу ортодоксального способа мультиплексирования от изначально непараллельного источника к нескольким независимым параллельным конвейерам без блокировки на самом медленном.Согласно исходному коду, существует широковещательная передача в простом цикле for (org.apache.flink.streaming.runtime.tasks.OperatorChain:630
, v 1.5.0), которая в случае слишком высокого обратного давления приводит к зависанию потока, выполняющего SourceContext.collect
в ожидании монитора, по-видимому, из-заполный буфер нижестоящего оператора.