Apache Flink: окно присоединения к потоку не запущено - PullRequest
0 голосов
/ 04 октября 2018

Я пытаюсь объединить два потока в Apache Flink, чтобы получить некоторые результаты.

Текущее состояние моего проекта заключается в том, что я получаю данные из твиттера и отображаю их в 2-кортеж, гдеязык пользователя и сумма твитов в определенном временном окне сохраняются.Я делаю это как для количества твитов на язык, так и для ретвитов на язык.Агрегация твитов / ретвитов работает нормально в других процессах.

Теперь я хочу получить процент от числа ретвитов к количеству всех твитов в временном окне.

Поэтому я используюследующий код:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
        .flatMap(new TweetLangFlatMap())
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
        .keyBy(new KeyByTweetPostId())
        .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)
            .where(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .window(TumblingEventTimeWindows.of(windowSize))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
                @Override
                public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                    String lang = in1.f0;
                    Double percentage = (double) in1.f1 / in2.f1;
                    return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
                }
            })
            .print();

Когда я печатаю tweetsLangSum или retweetsLangSum, вывод выводится нормально.Моя проблема в том, что я никогда не получаю вывод из объединения.У кого-нибудь есть идеи почему?Или я неправильно использую оконную функцию на первом этапе агрегирования, когда дело доходит до объединения?

1 Ответ

0 голосов
/ 05 октября 2018

Это может быть вызвано сочетанием различной семантики времени.Метод KeyedStream.timeWindow() представляет собой ярлык, который создает оператор окна на основе сконфигурированных временных характеристик, то есть окна времени-события, если включено время-события, или окна времени обработки в противном случае.Для объединения вы явно определяете окно времени события.

Включили ли вы обработку времени события?

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
...