Flink ReducingState влияние производительности - PullRequest
0 голосов
/ 29 мая 2018

выяснилось, когда мы использовали ReducingState RecordStore.add (r), производительность колеблется, как показано,

без ReducingState: график стабильной производительности

с ReducingState: график колебания производительности

Общая производительность (падение более чем на 100%!): без Reducingstate.add VS с ReducingState.add

Может бытьлегко воспроизводить с помощью простого приложения, без контрольной точки, просто сохраняйте запись, также с помощью простой функции «суммирования» (фактически, с пустой функцией вы увидите тот же результат).Любая идея будет оценена.Что за невероятно очевидная проблема.

В основном, приложение просто хранит записи в состоянии, и мы измеряем, сколько записей в секунду в "JsonTranslator", что показано на графике.Разница между ними - всего одна строка, комментарий / не-комментарий "recStore.add (r)".

Понимание состояния повлияет на производительность, но так ли это работает?

DataStream<String> stream = env.addSource(new GeneratorSource(loop);
DataStream<JSONObject> convert = stream.map(new JsonTranslator(statsdUrl))
                                       .keyBy(new KeySelector<JSONObject, AggregationKey>() {... ...})
                                       .process(new ProcessAggregation(aggrDuration, statsdUrl))
                                       .map(new PassthruFunction(statsdUrl));  


public class ProcessAggregation extends ProcessFunction<JSONObject, JSONObject> {
    private ReducingState<JSONObject> recStore;

    public void processElement(JSONObject r, Context ctx, Collector<JSONObject> out) {
        recStore.add(r); //this line make the difference
}

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Я провел несколько экспериментов с кодом, которым вы поделились.Я только запускал его на своем ноутбуке.Я оставил весь код statsd на месте, но я не запускаю statsd.Вместо этого я настроил web.refresh-interval на 1 секунду и заметил numRecordsOutPerSecond в Flink Web Dashboard.Единственное, что я изменил, это изменил GeneratorSource для непрерывной работы, чтобы я мог наблюдать поведение в стационарном режиме.

Это то, что я видел:

  1. За исключением случаев, когда работа начинается, я не вижу резких колебаний пропускной способности.Существует начальный период около 30 секунд, в течение которого пропускная способность постепенно увеличивается до значения, которое затем поддерживается достаточно стабильно (после начальной фазы запуска оно изменяется примерно на 10%, с ReducingState или без него).

  2. Обновление версии Flink с 1.3.2 до 1.5.0 улучшило общую пропускную способность почти в 2 раза.Это не очень удивительно, так как с сетевого стека Flink было проделано много работы с 1.3.

  3. Комментирование mergedRecordStore.add(r); также повышает пропускную способность примерно в 2 раза.

Глядя на код, я вижу одну вещь, которая вызывает некоторую боль.Вы выполняете кеинг, сериализацию / десериализацию и сокращение с помощью JSONObjects.Это дорогоБыло бы лучше преобразовать JSON в POJO или Tuples, с которыми будет дешевле работать.

0 голосов
/ 29 мая 2018

Если ваша задача может быть легко выполнена на одном компьютере с небольшим количеством потоков, то flink может оказаться для вас излишним, если влияние управляемого состояния на производительность слишком велико.

Тем не менее, вам не нужно напрямую использовать ReducingState таким образом, как правило, вы должны использовать функции aggregate и reduce для оконных операторов (также, какое у вас здесь окно?) Это не такясно, хотя, когда вы выводите свой результат.Вы постоянно излучаете совокупность?

Ваш источник генерирует данные, которые входят в несколько ключей?

Используете ли вы бэкэнд состояния по умолчанию или RocksDB?

Кроме того, вы можете воспользоваться удобной функцией sum, которую предоставляет Flink, которая позволит вам указать, к каким полям добавить.

...