Я пишу код для функции processElement
в Apache Flink 1.4:
public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{
private ListState<String> listState;
public void processElement(Tuple2<String, String> tuple2, Context context, Collector<Tuple2<String, String>> collector) {
// if the state is empty, start a timer
if (listState.get().iterator().hasNext() == false)
context.timerService().registerEventTimeTimer(10000);
listState.add("someStringToBeStored");
// ...
}
}
У меня есть эта функция, когда истекает время таймера:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
Iterable<String> strings = listState.get();
int cnt = 0;
int totalLength = 0;
Iterator<String> it = strings.iterator();
while (it.hasNext()) {
cnt++;
totalLength += it.next().length();
}
LOGGER.info("cnt is:" + cnt);
LOGGER.info("totalLength is:" + totalLength);
// clearing the state
listState.clear();
}
Однако каждый раз, когда я запускаю приложение, значение cnt
всегда равно 1, а значение totalLength
- это длина конкретной строки, которая была обработана в то время. Похоже, что состояние не сохраняется в системе. Из этого кода ясно, что я здесь делаю не так?