Мое выполнение задания Flink Streaming заканчивается, пока источник данных все еще производит данные? - PullRequest
0 голосов
/ 11 июля 2019

Здравствуйте. Я новичок в Flink и в настоящее время пытаюсь настроить базовый конвейер, который получает данные от генератора случайных строк, выполняет wordCount и сохраняет вывод в файл.Я заинтересован в том, чтобы программы flink продолжали работать до тех пор, пока я не решу остановить их вручную или когда больше нет входных данных. Однако программа запускается и заканчивается, даже когда входные данные все еще генерируются.

Iпопытался использовать любую из этих двух строк кода окна, но безуспешно:

.window (ProcessingTimeSessionWindows.withGap (Time.minutes (10))) .timeWindow (Time.of (2500, MILLISECONDS), Time.из (500, МИЛЛИСЕКОНДОВ))

Ниже приведен пример входного файла, который работает непрерывно:

ACBAB CEECC BCCAD EADAE BBCDA DCADD

Это код, которым я сейчас являюсьвыполняется:

public static void main (String [] args) выдает исключение {

    final ParameterTool params = ParameterTool.fromArgs(args);

    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data
    DataStream<String> text = null;
    if (params.has("input")) {
        // read the text file from given input path
        text = env.readTextFile(params.get("input"));
    } else {
        System.out.println("Executing WindowWordCount example with default input data set.");
        System.out.println("Use --input to specify file input.");
        // get default test text data
        text = env.fromElements("input");
    }

    // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);


    DataStream<Tuple2<String, Integer>> stream = 
        text.flatMap(new WindowWordCount.Tokenizer())
        .keyBy(0)
        .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
        .sum(1);

    // emit result
    if (params.has("output")) {
        stream.writeAsText(params.get("output"));
    } else {
        System.out.println("Printing result to stdout. Use --output to specify output path.");
        stream.print();
    }

    // execute program
    env.execute("WindowWordCount");
}


public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

}

Программы работают нормально и через некоторое время останавливаются со следующим сообщением:

Начало выполнения программы Выполнение программы завершено Работа с JobID 74a6f54d80af96f029a8e6c32d9bc8da завершена.Время выполнения задания: 176994 мс

Я хочу, чтобы программа продолжала работать, потому что мне интересно анализировать метрики производительности движка flink.

...