Как сохранить вещание из непараллельного источника, когда один из нисходящих каналов зависает? - PullRequest
0 голосов
/ 03 июня 2018

Следующая программа печатает «1» в течение нескольких секунд, а затем зависает.s1 и s2 имеют параллелизм 4. В конечном счете, мой вопрос состоит в том, как заставить следующий код продолжать печатать «1» бесконечно, не меняя код в операторах и источнике?

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    DataStreamSource<Integer> numbers = env.addSource(new SourceFunction<Integer>() {

        @Override
        public synchronized void run(SourceContext<Integer> sourceContext) {
            while (true) sourceContext.collect(0);
        }

        @Override
        public void cancel() { }

    });
    DataStream<Integer> s1 = numbers.map(d -> d + 1);
    DataStream<Integer> s2 = numbers.map(d -> {
        while (true) Thread.yield();
    });
    s1.print();
    s2.print();
    env.execute();
}

Может бытьЯ неправильно понял всю историю Flink, но я не вижу ортодоксального способа мультиплексирования от изначально непараллельного источника к нескольким независимым параллельным конвейерам без блокировки на самом медленном.Согласно исходному коду, существует широковещательная передача в простом цикле for (org.apache.flink.streaming.runtime.tasks.OperatorChain:630, v 1.5.0), которая в случае слишком высокого обратного давления приводит к зависанию потока, выполняющего SourceContext.collect в ожидании монитора, по-видимому, из-заполный буфер нижестоящего оператора.

1 Ответ

0 голосов
/ 11 июня 2018

Если два нижестоящих оператора действительно независимы, то я бы предложил создать для обоих операторов собственный источник.

Проблема с вашим приложением состоит в том, что два нижестоящих оператора неявно связаны через гарантии обработки Флинка.С гарантией обработки ровно один раз и хотя бы один раз, источник не может просто продолжать отправлять записи вниз по течению, если один из операторов зависает.Он должен убедиться, что все потребители видят все элементы, по крайней мере, один раз, и, следовательно, необходимо также оказать обратное давление, если один из потребителей зависнет.

...