Flink потребляет больше памяти, чем ожидалось - PullRequest
0 голосов
/ 03 мая 2018

Я использую Flink 1.4.1 для обработки транзакционных событий и HDFS для хранения информации о контрольных точках для обеспечения отказоустойчивости.

Было создано задание для сбора информации о клиентах, днях недели и часах дня, создавая профиль, как показано в приведенном ниже коде.

val stream = env.addSource(consumer)
val result = stream
  .map(openTransaction => {
    val transactionDate = openTransaction.get("transactionDate")
    val date = if (transactionDate.isTextual)
      LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
    else
      transactionDate.asLong
    (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
  })
  .keyBy(0)
  .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
  .sum(1)

В вышеприведенном коде поток имеет три поля: "транзакция транзакции", "идентификатор клиента" и "сумма". Мы создаем поток с ключами посредством clientId и скользящего окна, суммирующего сумму. В нашей базе данных около 100 000 уникальных активных идентификаторов клиентов.

Через некоторое время общая оперативная память, используемая заданием, стабилизируется на 36 ГБ , но сохраненная контрольная точка в HDFS использует только 3 ГБ . Есть ли способ уменьшить использование оперативной памяти заданием, возможно, путем настройки коэффициента репликации Flink или с помощью RocksDB?

1 Ответ

0 голосов
/ 03 мая 2018

Использование RocksDB - абсолютно то, что вы должны учитывать для этого размера состояния и, в зависимости от шаблонов использования, могут иметь гораздо меньшие контрольные точки, поскольку оно выполняет это постепенно, только копируя новые или обновленные SST.

Что нужно знать, имейте в виду:

  • Каждая параллельная подзадача с состоянием оператора будет иметь собственную RocksDB пример.
  • Если вы переключитесь на RocksDB для проверки и начинает работать медленнее, чем нужно, убедитесь, что Используемая вами сериализация максимально эффективна.
  • Flink предоставляет некоторые предопределенные опции, основанные на вашей файловой системе, убедитесь, что вы выбрали это соответствующим образом
  • Если предопределенные параметры не работают для вас, вы можете переопределить OptionsFactory для бэкэнда RocksDB и настроить отдельные параметры RocksDB

Еще одна вещь, которую следует отметить при использовании памяти во Flink с временными окнами с ключами, заключается в том, что «таймеры» могут израсходовать значительный объем памяти, если вы собираетесь использовать сотни тысяч или миллионы. Таймеры мерцания основаны на куче (на момент написания этой статьи) и синхронно проверяются независимо от состояния вашего сервера.

...