Я работаю над проектом flink, который записывает поток в реляционную базу данных.
В текущем решении мы написали специальную функцию приемника, которая открывает транзакцию, выполняет оператор вставки SQL и закрывает транзакцию.Он работает хорошо, пока объем данных не увеличится, и у нас не начнутся проблемы с тайм-аутом соединения.Мы попробовали несколько настроек конфигурации пула соединений, это не очень помогает.
Мы думаем о попытке «пакетной вставки», чтобы уменьшить количество «записей» в базу данных.Мы сталкиваемся с несколькими классами, которые делают почти то, что мы хотим: JDBCOutputFormat, JDBCSinkFunction.С помощью JDBCOutputFormat мы можем настроить размер пакета.
Мы также хотели бы принудительно «пакетную вставку» каждые 1 минуту, если количество записей не превышает «размер пакета».Как бы вы обычно справлялись с такими проблемами?Мои первые мысли - расширить JDBCOutputFormat, чтобы использовать запланированные задачи для принудительной очистки каждые 1 минуту, но было неочевидно, как это можно сделать.
Нужно ли нам вместе писать свой собственный приемник?