Apache Flink подряд разбивает странное поведение - PullRequest
0 голосов
/ 24 июня 2019

Я полагаю, что Флинк ведет себя странно, когда два разделения делаются последовательно. У меня могут быть некоторые ошибки в моей логике реализации, поэтому я публикую здесь, чтобы спросить ваше мнение.

Минимальный пример: у меня есть текстовый файл, содержащий слова Apple, Banana и Orange. Я передаю это в среде исполнения потока в качестве источника. Я делаю первый сплит, в котором условие выбора, если аргумент является словом «Apple». Если да, я ставлю это «тема» Яблоки , в противном случае в «теме» NotApples . Затем я выбираю в этом разделенном потоке «топик» NotApples и делю его снова, но на этот раз условие проверяет, является ли аргумент словом «Orange». Если да, он помещается в «топик» Апельсины , в ином случае в «топик» NotOranges .

Что я ожидаю в конце, когда я печатаю тему последнего сплит-потока NotOranges - печатать только слово "Банан". Однако то, что я на самом деле напечатал, это и слова «яблоко», и «банан». Я заметил, что когда выполняется второе разделение, обрабатываемый им поток - это не тот поток, который содержит только элементы темы, из которых я выбрал (т.е. NotApples), а все элементы. Я что-то упустил?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Apple")) {
            output.add("Apples");
        } else {
            output.add("NotApples");
        }
        return output;
    }
});


DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Orange")) {
            output.add("Oranges");
        } else {
            output.add("NotOranges");
        }
        return output;
    }
});

DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");

Выход:

1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple

NB .: Я знаю, что у меня может быть одно разбиение для реализации той же логики (в которой я проверяю, является ли аргумент «Apple» ИЛИ «Organge»). Однако это не главное в моем вопросе. Сначала я заметил это поведение в более сложной программе, которую я написал, где необходимы два последовательных разбиения, поэтому я решил попытаться воссоздать его в минимальном примере, чтобы проверить, могу ли я воспроизвести его.

Ответы [ 2 ]

2 голосов
/ 26 июня 2019

Недавно в списке рассылки обсуждалось это некорректное поведение на тему «Об устаревании split / select для DataStream API». Я думаю, что ключевым комментарием было:

Во-первых, мы должны признать, что текущая реализация для split / select имеет недостатки Я примерно прошел исходники, проблема может быть в том, что для последовательного выбора / разбиения (ей) прежний будет переопределен позже, на этапе генерации StreamGraph. Вот почему мы запрещаем это последовательная логика в FLINK-11084.

После просмотра FLINK-11084 и полученного в результате патча я полагаю, что недавние выпуски Flink вызовут исключение, если вы сделаете два последовательных разделения / выбора.

0 голосов
/ 24 июня 2019

Учитывая то, что я знаю о том, как реализован Split / Select, меня не удивит, если это не сработает (хотя я не знаю достаточно, чтобы быть уверенным).Более того, split / select недавно был объявлен устаревшим (хотя неясно, действительно ли он исчезнет).

Лучший способ сделать split / select - через боковые выходы .Это более мощный механизм с более чистой реализацией.

...