Flink: понимание потока данных моей программы - PullRequest
0 голосов
/ 30 июня 2018

Я разработал программу Flink, которая читает твиты из Twitter и помещает их в Kafka. Затем он возвращает твиты от Кафки и обрабатывает их.

Преобразование «Обработка твитов» извлекает хештеги и пользователей из текста твита и выводит их в выводе по умолчанию, а каждую пару выводит в боковом выводе.

Прикрепленное изображение выбирается из веб-интерфейса Flink. Я не понимаю, почему Источник Кафки и оператор обработки твитов объединены в одну задачу, и в первую очередь я хочу, чтобы приемник твитов получал все необработанные твиты от источника Кафки, а не от вывода оператора обработки твитов.

Программа правильная?

Datalow

Это соответствующая часть кода:

    FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(Constants.KAFKA_TWEETS_TOPIC, new SimpleStringSchema(), properties);
    myConsumer.setStartFromLatest();

    DataStream<String> tweetsStream = env
            .addSource(myConsumer)
            .name("Kafka tweets consumer");

    SingleOutputStreamOperator<List<String>> tweetsAggregator = tweetsStream
            .timeWindowAll(Time.seconds(7))
            .aggregate(new StringAggregatorFunction())
            .name("Tweets aggregation");

    DataStreamSink tweetsSink = tweetsAggregator.addSink(new TweetsSink())
            .name("Tweets sink")
            .setParallelism(1);

    SingleOutputStreamOperator<String> termsStream = tweetsStream
            // extracting terms from tweets
            .process(new TweetParse())
            .name("Tweets processing");

    DataStream<Tuple2<String, Integer>> counts = termsStream
            .map(new ToTuple())
            // Counting terms
            .keyBy(0)
            .timeWindow(Time.seconds(13))
            .sum(1)
            .name("Terms processing");

    DataStream<Tuple3<String, String, Integer>> edgesStream = termsStream.getSideOutput(TweetParse.outputTag)
            .map(new ToTuple3())
            // Counting terms pairs
            .keyBy(0, 1)
            .timeWindow(Time.seconds(19))
            .sum(2)
            .name("Edges processing");

1 Ответ

0 голосов
/ 20 декабря 2018

Вы создаете два разных потока данных с tweetsStream. первый - tweetsAggregator, а второй - termsStream. Затем вы снова создаете два разных потока данных из termStream: counts и edgesStream. Операторы Sink не имеют выходных данных. Таким образом, он не может генерировать данные для другого оператора, и он должен быть последним оператором, который будет использовать. Вы должны начать с оператора источника данных addSource(myConsumer), объединить столько преобразований, сколько хотите timeWindowAll, aggregate, map, keyBy, а затем вызвать оператор приемника. Вы можете вызывать более одного приемника, если хотите, но помните, что приемники не генерируют поток данных для других операторов, они являются потребителями.

...