Flink SQL: не хватает памяти для объединения таблиц - PullRequest
0 голосов
/ 10 октября 2018

У меня часто обновляется таблица MySql.Я хочу сделать снимок для каждого идентификатора, который обновляется за последние 20 секунд, и записать значение в redis.Я использую binlog в качестве потокового ввода и преобразую поток данных в таблицу Flink.Я запускаю следующий sql.

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
   SELECT id, MAX(ts)
   FROM my_tbl
   GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)

Поскольку я знаю, что объединение таблиц приведет к чрезмерному размеру состояния, я устанавливаю StreamQueryConfig следующим образом:

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));

Я запускаю задачу в течение одного дня и получаюОшибка нехватки памяти.Как я могу решить эту проблему?

1 Ответ

0 голосов
/ 10 октября 2018

Эту проблему можно решить также с помощью соединения с временными окнами вместо обычного соединения с настроенным временем сохранения в состоянии ожидания.

Следующий запрос должен помочь.

SELECT id, ts, val
FROM my_tbl m1,
     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

оконный предикат соединения (BETWEEN) обеспечивает автоматическую очистку состояния.Поскольку вы используете не точное время обработки, я добавил 5 секунд времени простоя.

...