Почему мои запросы Flink SQL имеют очень разные размеры контрольных точек? - PullRequest
0 голосов
/ 19 марта 2019

При использовании Flink Table SQL в моем проекте я обнаружил, что если бы в моем SQL-запросе было какое-либо предложение GROUP BY, размер контрольной точки значительно увеличился бы.

Например,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name
FROM
    FCBOX_POST_COUNT_VIEW

Размер контрольной точки будет меньше 500 КБ.

Но при таком использовании

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name,
    sum(ed_post_count)
FROM
    FCBOX_POST_COUNT_VIEW
GROUP BY
    sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)

Размер контрольной точки будет превышать 70 МБ, даже если нет сообщенийобработанный.Например,

Image is here.

Но при использовании API DataStream и keyBy вместо таблицы SQL GROUP BY size размер контрольной точки будет нормальным, меньшечем 1 МБ.

Почему?

------- обновлено в 2019-03-25 --------

После выполнения некоторых тестов ичитая исходный код, мы обнаружили, что причиной этого была RocksDB.

При использовании RockDB в качестве бэкэнда состояния размер контрольной точки будет превышать примерно 5 МБ на ключ, а при использовании файловой системы в качестве бэкэнда состоянияразмер контрольной точки уменьшится до менее 100 КБ на ключ.

Почему RocksDB нужно так много места для хранения состояния?Когда мы должны выбрать RocksDB?

1 Ответ

0 голосов
/ 19 марта 2019

Прежде всего, я бы не стал считать 70 МБ огромным состоянием.Есть много рабочих мест Flink с несколькими ТБ государства.Что касается вопроса, почему размеры состояний обоих запросов различаются:

Первый запрос - это простой проекционный запрос, что означает, что каждая запись может обрабатываться независимо.Следовательно, запрос не должен «запоминать» какие-либо записи, а только смещения потока для восстановления.

Второй запрос выполняет агрегацию окон и должен помнить промежуточный результат (частичную сумму) для каждого окна довремя шло достаточно, чтобы результат был окончательным и мог быть передан.

Поскольку запросы Flink SQL переводятся в операторы DataStream, между запросом SQL и реализацией агрегации с помощью keyBy().window() нет большой разницы.Оба запускают в основном один и тот же код.

Обновление : Обнаружена причина повышенного состояния, вызванная RocksDBStateBackend.Эти издержки являются не накладными расходами для каждого ключа, а накладными расходами для оператора с состоянием.Поскольку RocksDBStateBackend предназначен для хранения размеров состояний от нескольких ГБ до ТБ, издержки в несколько МБ незначительны.

...