Я использую Flink 1.4.1
для обработки транзакционных событий и HDFS для хранения информации о контрольных точках для обеспечения отказоустойчивости.
Было создано задание для сбора информации о клиентах, днях недели и часах дня, создавая профиль, как показано в приведенном ниже коде.
val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)
В вышеприведенном коде поток имеет три поля: "транзакция транзакции", "идентификатор клиента" и "сумма". Мы создаем поток с ключами посредством clientId и скользящего окна, суммирующего сумму. В нашей базе данных около 100 000 уникальных активных идентификаторов клиентов.
Через некоторое время общая оперативная память, используемая заданием, стабилизируется на 36 ГБ , но сохраненная контрольная точка в HDFS использует только 3 ГБ . Есть ли способ уменьшить использование оперативной памяти заданием, возможно, путем настройки коэффициента репликации Flink или с помощью RocksDB?