Как читать первые сообщения кафки в флинк? - PullRequest
0 голосов
/ 15 февраля 2019

Я использую flink для построения конвейера, источником которого является kafka.Для тестирования я просто хочу прочитать первые N сообщений от kafka, и после этого мне нужно остановить поток.

Как я могу это сделать?Я использую FlinkKafkaConsumer08.

1 Ответ

0 голосов
/ 16 февраля 2019

Чтобы сделать что-либо с Flink с сохранением состояния, вы должны использовать управляемое состояние Flink, чтобы ваше приложение было отказоустойчивым.Но если вы хотите игнорировать это требование, это может быть так просто:

public class Example {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(...)
           .filter(new Limiter())
           .print();

        env.execute();
    }

    public static class Limiter implements FilterFunction<Event> {
        private transient int count = 0;

        @Override
        public boolean filter(Event e) throws Exception {
            if (++count <= 10) {
                return true;
            } else {
                return false;
            }
        }
    }
}
...