Я разработал программу Flink, которая читает твиты из Twitter и помещает их в Kafka. Затем он возвращает твиты от Кафки и обрабатывает их.
Преобразование «Обработка твитов» извлекает хештеги и пользователей из текста твита и выводит их в выводе по умолчанию, а каждую пару выводит в боковом выводе.
Прикрепленное изображение выбирается из веб-интерфейса Flink. Я не понимаю, почему Источник Кафки и оператор обработки твитов объединены в одну задачу, и в первую очередь я хочу, чтобы приемник твитов получал все необработанные твиты от источника Кафки, а не от вывода оператора обработки твитов.
Программа правильная?
Datalow
Это соответствующая часть кода:
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(Constants.KAFKA_TWEETS_TOPIC, new SimpleStringSchema(), properties);
myConsumer.setStartFromLatest();
DataStream<String> tweetsStream = env
.addSource(myConsumer)
.name("Kafka tweets consumer");
SingleOutputStreamOperator<List<String>> tweetsAggregator = tweetsStream
.timeWindowAll(Time.seconds(7))
.aggregate(new StringAggregatorFunction())
.name("Tweets aggregation");
DataStreamSink tweetsSink = tweetsAggregator.addSink(new TweetsSink())
.name("Tweets sink")
.setParallelism(1);
SingleOutputStreamOperator<String> termsStream = tweetsStream
// extracting terms from tweets
.process(new TweetParse())
.name("Tweets processing");
DataStream<Tuple2<String, Integer>> counts = termsStream
.map(new ToTuple())
// Counting terms
.keyBy(0)
.timeWindow(Time.seconds(13))
.sum(1)
.name("Terms processing");
DataStream<Tuple3<String, String, Integer>> edgesStream = termsStream.getSideOutput(TweetParse.outputTag)
.map(new ToTuple3())
// Counting terms pairs
.keyBy(0, 1)
.timeWindow(Time.seconds(19))
.sum(2)
.name("Edges processing");