Я запускаю Kafka на локальном компьютере и публикую на нем данные, используя
kafka-console-producer --broker-list localhost:9092 --topic som_test_event
, и пытаюсь прочитать и распечатать его с помощью приведенного ниже кода. Но это не работает. Чего мне не хватает?
public class Flink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.enableCheckpointing(5000L);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "myGroup");
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer010<DeliveryEventSerializationV1> consumer =
new FlinkKafkaConsumer010<>("som_test_event", new InputMessageDeserializationSchema(), properties);
DataStream<DeliveryEventSerializationV1> dataStream = executionEnvironment
.addSource(consumer)
.rebalance()
dataStream.addSink(
StreamingFileSink
.forBulkFormat(new Path("/Users/xyz/FlinkOut"),
ParquetAvroWriters.forReflectRecord(DeliveryEventSerializationV1.class))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build());
dataStream.print();
executionEnvironment.execute();
}
}