Я полагаю, что Флинк ведет себя странно, когда два разделения делаются последовательно. У меня могут быть некоторые ошибки в моей логике реализации, поэтому я публикую здесь, чтобы спросить ваше мнение.
Минимальный пример: у меня есть текстовый файл, содержащий слова Apple, Banana и Orange. Я передаю это в среде исполнения потока в качестве источника. Я делаю первый сплит, в котором условие выбора, если аргумент является словом «Apple». Если да, я ставлю это «тема» Яблоки , в противном случае в «теме» NotApples . Затем я выбираю в этом разделенном потоке «топик» NotApples и делю его снова, но на этот раз условие проверяет, является ли аргумент словом «Orange». Если да, он помещается в «топик» Апельсины , в ином случае в «топик» NotOranges .
Что я ожидаю в конце, когда я печатаю тему последнего сплит-потока NotOranges - печатать только слово "Банан". Однако то, что я на самом деле напечатал, это и слова «яблоко», и «банан». Я заметил, что когда выполняется второе разделение, обрабатываемый им поток - это не тот поток, который содержит только элементы темы, из которых я выбрал (т.е. NotApples), а все элементы. Я что-то упустил?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Apple")) {
output.add("Apples");
} else {
output.add("NotApples");
}
return output;
}
});
DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Orange")) {
output.add("Oranges");
} else {
output.add("NotOranges");
}
return output;
}
});
DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");
Выход:
1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple
NB .: Я знаю, что у меня может быть одно разбиение для реализации той же логики (в которой я проверяю, является ли аргумент «Apple» ИЛИ «Organge»). Однако это не главное в моем вопросе. Сначала я заметил это поведение в более сложной программе, которую я написал, где необходимы два последовательных разбиения, поэтому я решил попытаться воссоздать его в минимальном примере, чтобы проверить, могу ли я воспроизвести его.