Вы можете получить искомый эффект, просто выполнив следующее:
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();
Если вы используете setStartFromEarliest, тогда Flink проигнорирует смещения, сохраненные в Kafka, и вместо этого начнет чтение с самой ранней записи. Более того, даже если вы используете setStartFromEarliest, если Flink возобновляет работу с контрольной точки или точки сохранения, он вместо этого будет использовать смещения, сохраненные в этом снимке.
Обратите внимание, что Flink выполняет свое собственное управление смещениями Kafka и при восстановлении изконтрольная точка игнорирует смещения, сохраненные в Kafka. Flink делает это как часть предоставления точно однократных гарантий, для чего необходимо точно знать, сколько входных данных было использовано для получения результатов, присутствующих в остальной части состояния, захваченного в контрольной точке или точке сохранения. По этой причине Flink всегда сохраняет смещения как часть каждого снимка состояния (контрольная точка или точка сохранения).
Это задокументировано здесь и здесь .
Что касается вашего исходного вопроса о initializeState
, он доступен, если вы реализуете интерфейс CheckpointedFunction
, но на самом деле это редко требуется.