Управление памятью Kafka Stream (Ktable, RocksDb) - PullRequest
0 голосов
/ 14 июня 2019

Привет, похоже, я не могу правильно масштабировать мой модуль для потокового приложения Kafka (работает на java 11 jre ) и продолжать иметь OOMKilled контейнеры.

топология потока kafka

Работа состоит в агрегировании довольно большого количества одновременных значений

  • Я использую KTable:
    KTable<String, MinuteValue> MinuteValuesKtable = builder.table(
                  "minuteTopicCompact",
                  Materialized.<String, MinuteValue, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteValueSerdes)
          .withLoggingEnabled(new HashMap<>()));
  • И вычислить агрегацию:
    KStream<String, MinuteAggreg> minuteAggByDay = MinuteValuesKtable
      // rekey each MinuteValue and group them
      .groupBy(
        (key, minuteValue) -> new KeyValue<>(getAggKey(minuteValue), billLine), Serialized.with(Serdes.String(), billLineSerdes))
      // aggregate to MinuteAggreg
      .aggregate(
            MinuteAggreg::new,
        (String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.addLine(value),
        (String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.removeLine(value),
            Materialized.with(Serdes.String(), minuteAggregSerdes))
        .toStream()
    // [...] send to another topic

Настройки памяти потока Кафки

Я попытался настроить эти значения:

    // memory sizing and caches
properties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 5 * 60 * 1000L);
// Enable record cache of size 8 MB.
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 8 * 1024 * 1024L);
// Set commit interval to 1 second.
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

My java 11 Приложение запускается со следующими аргументами:

      -XX:+UseContainerSupport
      -XX:MaxRAMFraction=2

настройки памяти модуля

И модуль имеет некоторые ограничения памяти:


    Limits:
      cpu:     4
      memory:  2Gi
    Requests:
      cpu:      2
      memory:   1Gi

Но все равно получая ошибки модуля, kubernetes удаляет модуль с помощью «OOMKilled».

Может ли эксперт по потоку Кафки помочь мне настроить эти значения?

читать ресурсы

Я прочитал: https://docs.confluent.io/current/streams/sizing.html#troubleshooting и https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html

, но не смог найти всеобъемлющего и простогодостаточно ответа для настройки:

  • пределы порогов дБ,
  • пределы потока кафки,
  • пределы jmv
  • и ограничение контейнеров
...