Apache Flink 1.6.0 - StateTtlConfig и ListState - PullRequest
0 голосов
/ 30 августа 2018

Я нахожусь в процессе внедрения системы потоковой обработки с проверкой концепции, использующей Apache Flink 1.6.0, и храню список полученных событий, разделенных по ключам, в ListState. (Не беспокойтесь о том, почему я это делаю, просто поработайте со мной здесь.) У меня установлен StateTtlConfig на соответствующий ListStateDescriptor. Согласно документации :

  1. "Все типы коллекций состояний поддерживают TTL для каждой записи. Это означает, что срок действия элементов списка и записей карты истекает независимо."
  2. "В настоящее время истекшие значения удаляются только тогда, когда они считываются явно, например, путем вызова ValueState.value()."

Вопрос 1

Что из следующего представляет собой чтение ListState:

  1. Запрос итератора, но не его использование - myListState.get();.
  2. На самом деле с использованием итератора - for (MyItem i : myListState.get()) { ... }

Вопрос 2

Что означает "TTL для каждой записи" на самом деле ? В частности, я спрашиваю о следующем:

Предположим, у меня есть конкретный экземпляр ListState<Character>. Дескриптор имеет TTL 10 секунд. Я вставляю 'a'. Через две секунды я вставляю 'b'. Девять секунд спустя я вставляю 'c'. Если я повторю это ListState, какие элементы будут возвращены?

Другими словами:

ListState<Character> ls = getRuntimeContext().getListState(myDescriptor);

ls.add('a');

// ...two seconds later...
ls.add('b');

// ...nine seconds later...
ls.add('c');

// Does this iterate over 'a', 'b', 'c'
// or just 'b' and 'c'?
for (Character myChar : ls.get()) { ... }

1 Ответ

0 голосов
/ 31 августа 2018

Ответ 1

Ответ 1. Для ListState обрезка выполняется для myListState.get();.

Ответ 2

«TTL для каждой записи» означает, что тайм-аут применяется к одной записи, а не ко всей коллекции. Для вашего примера, предполагая, что в момент считывания прошло 10 секунд с момента вставки a, он будет перебирать b и c. a в этом случае будет сокращено.

...