Flink повторно масштабируемая функция потока с отслеживанием состояния - PullRequest
3 голосов
/ 05 мая 2020
• 1000 1004 *
public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

В будущем я хотел бы изменить масштаб параллелизма (с 2 до 4), поэтому мой вопрос: как мне достичь повторно масштабируемых состояний с ключами, чтобы после изменения параллелизма я мог получить соответствующий кэшировать ключевые данные в соответствующий слот задачи. Я попытался изучить это, где нашел документацию здесь . В соответствии с этим масштабируемое состояние оператора может быть достигнуто с помощью интерфейса ListCheckPasted, который предоставляет для этого метод snapshotState / restoreState. Но не знаете, как можно достичь повторно масштабируемого ключевого состояния (MyRichMapFunction)? Нужно ли мне реализовывать интерфейс ListCheckPasted для моего класса MyRichMapFunction? Если да, то как я могу перераспределить кеш в соответствии с новым ключом параллелизма ha sh в методе restoreState (мой MapState будет содержать огромное количество ключей с включенным TTL, допустим, макс. Он будет содержать 1 миллиард ключей в любой момент времени)? Не мог бы кто-нибудь помочь мне в этом, или, если вы укажете мне какой-нибудь пример, который тоже был бы замечательным.

1 Ответ

3 голосов
/ 05 мая 2020

Написанный вами код уже масштабируется; Управляемое ключевое состояние Flink может масштабироваться по дизайну. Состояние с ключом масштабируется путем изменения баланса назначения ключей экземплярам. (Вы можете думать о ключевом состоянии как о сегментированном хранилище ключей / значений. Технически происходит то, что согласованное хеширование используется для сопоставления ключей с группами ключей , и каждый параллельный экземпляр отвечает за некоторые из ключевых групп. Изменение масштаба просто включает в себя перераспределение ключевых групп между экземплярами.)

Интерфейс ListCheckpointed предназначен для состояния, используемого в неключевом контексте, поэтому он не подходит для того, что вы делаете. Также обратите внимание, что ListCheckpointed будет устаревшим в Flink 1.11 в пользу более общего CheckpointedFunction.

Еще одна вещь: если MyKeyExtractor вводится с помощью value.getEventId(), вы можете использовать ValueState<Boolean> для вашего кеша, а не MapState<String, Boolean>. Это работает, потому что с ключевым состоянием существует отдельное значение ValueState для каждого ключа. Вам нужно использовать MapState только тогда, когда вам нужно сохранить несколько пар атрибут / значение для каждого ключа в вашем потоке.

Большая часть этого обсуждается в документации Flink в разделе Практическое обучение , который включает пример , который очень близок к тому, что вы делаете.

...