Datastream не читает с Kafka используя flink - PullRequest
0 голосов
/ 26 февраля 2020

Я запускаю Kafka на локальном компьютере и публикую на нем данные, используя

kafka-console-producer --broker-list localhost:9092 --topic som_test_event

, и пытаюсь прочитать и распечатать его с помощью приведенного ниже кода. Но это не работает. Чего мне не хватает?

public class Flink {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(5000L);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "myGroup");
        properties.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer010<DeliveryEventSerializationV1> consumer =
                new FlinkKafkaConsumer010<>("som_test_event", new InputMessageDeserializationSchema(), properties);

        DataStream<DeliveryEventSerializationV1> dataStream = executionEnvironment
                .addSource(consumer)
                .rebalance()


        dataStream.addSink(
                StreamingFileSink
                        .forBulkFormat(new Path("/Users/xyz/FlinkOut"),
                                ParquetAvroWriters.forReflectRecord(DeliveryEventSerializationV1.class))
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build());

        dataStream.print();

        executionEnvironment.execute();
    }

}
...