Apache Состояние функции процесса мигания не удерживает состояние - PullRequest
0 голосов
/ 21 июня 2020

Я пишу код для функции processElement в Apache Flink 1.4:

public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{

    private ListState<String> listState;

    public void processElement(Tuple2<String, String> tuple2,  Context context, Collector<Tuple2<String, String>> collector) {

        // if the state is empty, start a timer
        if (listState.get().iterator().hasNext() == false)
            context.timerService().registerEventTimeTimer(10000);

        listState.add("someStringToBeStored");

        // ...
    }

}

У меня есть эта функция, когда истекает время таймера:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
    Iterable<String> strings = listState.get();
    int cnt = 0;
    int totalLength = 0;
    Iterator<String> it = strings.iterator();
    while (it.hasNext()) {
        cnt++;
        totalLength += it.next().length();
    }
    LOGGER.info("cnt is:" + cnt);
    LOGGER.info("totalLength is:" + totalLength);

    // clearing the state
    listState.clear();
}

Однако каждый раз, когда я запускаю приложение, значение cnt всегда равно 1, а значение totalLength - это длина конкретной строки, которая была обработана в то время. Похоже, что состояние не сохраняется в системе. Из этого кода ясно, что я здесь делаю не так?

Ответы [ 2 ]

1 голос
/ 21 июня 2020

Функции процесса использовали состояние с разделением по ключам, то есть для каждого ключа существует отдельный список. Я предполагаю, что не существует ключа с несколькими событиями за 10 секунд.

0 голосов
/ 21 июня 2020

Ваш ProcessFunctionClass должен расширить Flink ProcessFunction.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...