Я пытаюсь объединить два потока в Apache Flink, чтобы получить некоторые результаты.
Текущее состояние моего проекта заключается в том, что я получаю данные из твиттера и отображаю их в 2-кортеж, гдеязык пользователя и сумма твитов в определенном временном окне сохраняются.Я делаю это как для количества твитов на язык, так и для ретвитов на язык.Агрегация твитов / ретвитов работает нормально в других процессах.
Теперь я хочу получить процент от числа ретвитов к количеству всех твитов в временном окне.
Поэтому я используюследующий код:
Time windowSize = Time.seconds(15);
// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
.flatMap(new TweetLangFlatMap())
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
.keyBy(new KeyByTweetPostId())
.flatMap(new RetweetLangFlatMap());
// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
tweetsLangSum.join(retweetsLangSum)
.where(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.window(TumblingEventTimeWindows.of(windowSize))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
@Override
public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
String lang = in1.f0;
Double percentage = (double) in1.f1 / in2.f1;
return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
}
})
.print();
Когда я печатаю tweetsLangSum
или retweetsLangSum
, вывод выводится нормально.Моя проблема в том, что я никогда не получаю вывод из объединения.У кого-нибудь есть идеи почему?Или я неправильно использую оконную функцию на первом этапе агрегирования, когда дело доходит до объединения?