Как установить kafka CommitOffset в Flink? - PullRequest
0 голосов
/ 05 марта 2019

Я уже установил зафиксированное смещение на:

properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("auto.commit.enable", "true");
properties.setProperty("enable.auto.commit", "true");

FlinkKafkaConsumer08<MobilePageEvent> kafkaConsumer =
            new FlinkKafkaConsumer08<>(
                    "mobile-event.page-resource", SCHEMA, properties);

Но в веб-интерфейсе я получаю недопустимые фиксированные смещения:

enter image description here

1 Ответ

0 голосов
/ 06 марта 2019

После включения проверки наведения теперь работает:

StreamExecutionEnvironment environment =
            StreamExecutionEnvironment.getExecutionEnvironment();

environment.enableCheckpointing(5000);
...