Количество слов всегда меняется, при использовании Flink - PullRequest
0 голосов
/ 15 марта 2020

Я пытаюсь создать пример подсчета слов с помощью flink. Вот ссылка для слов данных (это пример из учетной записи flink на github)

Когда я считаю слова с помощью простой java программы:

public static void main(String[] args) throws Exception {
    int count = 0;
    for (String eachSentence : WordCountData.WORDS){
        String[] splittedSentence = eachSentence.toLowerCase().split("\\W+");
        for (String eachWord: splittedSentence){
            count++;
        }
    }
    System.out.println(count);
// result is 287
}

Теперь, когда я сделаю это с помощью flink, сначала я разделю предложение на слова.

DataStream<Tuple2<String, Integer>> readWordByWordStream = splitSentenceWordByWord(wordCountDataSource);

//...
public DataStream<Tuple2<String, Integer>> splitSentenceWordByWord(DataStream<String> wordDataSourceStream)
{
    DataStream<Tuple2<String, Integer>> wordByWordStream = wordDataSourceStream.flatMap(new TempTransformation());
    return wordByWordStream;
 }

  • Вот мой класс TempTransformation:
public class TempTransformation extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception
    {
        String[] splittedSentence = input.toLowerCase().split("\\W+");
        for (String eachWord : splittedSentence)
        {
            collector.collect(new Tuple2<String, Integer>(eachWord, 1));
        }
    }
}
  • Теперь я собираюсь посчитать слова, преобразовав их в KeyedStream (набираемый словом)
    public SingleOutputStreamOperator<String> keyedStreamExample(DataStream<Tuple2<String, Integer>> wordByWordStream)
    {
        return wordByWordStream.keyBy(0).timeWindow(Time.milliseconds(1)).apply(new TempWindowFunction());
    }
  • TempWindowFunction ():
public class TempWindowFunction extends RichWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
    private Logger logger = LoggerFactory.getLogger(TempWindowFunction.class);
    private int count = 0;
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception
    {
        logger.info("Key is:' {} ' and collected element for that key and count: {}", (Object) tuple.getField(0), count);
        StringBuilder builder = new StringBuilder();
        for (Tuple2 each : input)
        {
            String key = (String) each.getField(0);
            Integer value = (Integer) each.getField(1);
            String tupleStr = "[ " + key + " , " + value + "]";
            builder.append(tupleStr);
            count ++;
        }
        logger.info("All tuples {}", builder.toString());
        logger.info("Exit method");
        logger.info("----");
    }
}
  • После выполнения этого задания в локальных средах Flink выходы всегда меняются, вот несколько примеров:
18:09:40,086 INFO  com.sampleFlinkProject.transformations.TempWindowFunction     - Key is:' rub ' and collected element for that key and count: 86
18:09:40,086 INFO  TempWindowFunction     - All tuples [ rub , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----
18:09:40,086 INFO  TempWindowFunction     - Key is:' for ' and collected element for that key and count: 87
18:09:40,086 INFO  TempWindowFunction     - All tuples [ for , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----

// another running outputs:

18:36:21,660 INFO  TempWindowFunction     - Key is:' for ' and collected element for that key and count: 103
18:36:21,660 INFO  TempWindowFunction     - All tuples [ for , 1]
18:36:21,660 INFO  TempWindowFunction     - Exit method
18:36:21,660 INFO  TempWindowFunction     - ----
18:36:21,662 INFO  TempWindowFunction     - Key is:' coil ' and collected element for that key and count: 104
18:36:21,662 INFO  TempWindowFunction     - All tuples [ coil , 1]
18:36:21,662 INFO  TempWindowFunction     - Exit method
18:36:21,662 INFO  TempWindowFunction     - ----
  • Наконец, вот настройка выполнения
//...
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
//...
  • Почему Flink дает разные выходы для каждого исполнения?

1 Ответ

1 голос
/ 16 марта 2020

Одним из источников недетерминизма в вашем приложении является время обработки windows (которое составляет 1 мс). Всякий раз, когда вы используете время обработки для управления окнами, то windows в конечном итоге содержит все события, которые появляются и обрабатываются в течение интервала времени. (Время события windows ведет себя детерминистически, так как они основаны на временных отметках в событиях.) Если windows быть настолько коротким, это усилит этот эффект.

...