Здравствуйте. Я новичок в Flink и в настоящее время пытаюсь настроить базовый конвейер, который получает данные от генератора случайных строк, выполняет wordCount и сохраняет вывод в файл.Я заинтересован в том, чтобы программы flink продолжали работать до тех пор, пока я не решу остановить их вручную или когда больше нет входных данных. Однако программа запускается и заканчивается, даже когда входные данные все еще генерируются.
Iпопытался использовать любую из этих двух строк кода окна, но безуспешно:
.window (ProcessingTimeSessionWindows.withGap (Time.minutes (10))) .timeWindow (Time.of (2500, MILLISECONDS), Time.из (500, МИЛЛИСЕКОНДОВ))
Ниже приведен пример входного файла, который работает непрерывно:
ACBAB CEECC BCCAD EADAE BBCDA DCADD
Это код, которым я сейчас являюсьвыполняется:
public static void main (String [] args) выдает исключение {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text = null;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WindowWordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements("input");
}
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<Tuple2<String, Integer>> stream =
text.flatMap(new WindowWordCount.Tokenizer())
.keyBy(0)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.sum(1);
// emit result
if (params.has("output")) {
stream.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
stream.print();
}
// execute program
env.execute("WindowWordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Программы работают нормально и через некоторое время останавливаются со следующим сообщением:
Начало выполнения программы Выполнение программы завершено Работа с JobID 74a6f54d80af96f029a8e6c32d9bc8da завершена.Время выполнения задания: 176994 мс
Я хочу, чтобы программа продолжала работать, потому что мне интересно анализировать метрики производительности движка flink.