Какой бэкэнд состояния вы используете?
Если я не ошибаюсь, боковые входы реализованы как состояние в Flink. Если вы используете MemoryStateBackend в качестве бэкэнда состояния, вы действительно можете оказать давление на потребление памяти.
Кроме того, обработка событий будет блокироваться до тех пор, пока ввод с этой стороны не будет готов, буферизируя события , Если подготовка бокового ввода занимает много времени или частота входящих событий высока, это может привести к увеличению нагрузки на память.
Можно попробовать альтернативный бэкэнд состояния? Предпочтительно RocksDBStateBackend , он хранит данные в полете в базе данных RocksDB, а не в памяти.
Трудно догадаться, в чем проблема. Я бы порекомендовал отслеживать показатели, связанные с памятью - см. Хороший пост по этому вопросу здесь .
Вы также можете запустить профилирование на диспетчерах задач и проанализировать дампы - см. здесь
Увеличивается ли память также, если вы только опубликуете sh первое сообщение о «тестировании» topi c?
Возможно, чтобы изолировать проблему, я бы использовал более простой побочный ввод. Удалите HTTP-вызов и установите данные c. Может быть, периоди c сработала вместо Кафки:
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))