OutOfMemoryError при перезапуске моего приложения Kafka Streams - PullRequest
0 голосов
/ 15 апреля 2019

У меня есть приложение Kafka Streams (Kafka Streams 2.1 + Kafka broker 2.0), которое выполняет агрегацию на основе TimeWindows, и я использую оператор подавления для подавления вывода результата.

Все работает хорошо, пока я не перезапущу свое приложение, оно сбросит смещение KTABLE-SUPPRESS-STATE-STORE на 0, чтобы восстановить состояние подавления, как и ожидалось. Но каждый раз, когда я перезапускаю его, он выдает OutOfMemoryError, я думал, что, возможно, размера кучи недостаточно, поэтому я использую больший Xmx/Xms, он работает один или два перезапуска, а затем OutOfMemoryError возвращается снова , Теперь Xmx составляет около 20G, я думаю, что-то здесь не так.

Фрагмент кода:

TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);

KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
                .windowedBy(windows)
                .aggregate(MyStatistics::new,
                    (sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
                    Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
                .suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));

И я обнаружил, что ключ записи в KTABLE-SUPPRESS-STATE-STORE - что-то вроде 1234567j P, который не читается, но я думаю, что он генерируется путем объединения SN и окна, я думаю, что это сделает KTABLE- SUPPRESS-STATE-STORE резервируется, потому что каждый SN будет иметь несколько записей для каждого окна.

У меня два вопроса:

  1. Если OutOfMemoryError указывает на небольшой размер кучи или нет, если да, как ограничить скорость, если нет, что это значит?
  2. Ключ для KTABLE-SUPPRESS-STATE-STORE определяется тем, каким API, как или я должен его контролировать?

Спасибо!

Редактировать в 2019/4/16

Ошибка трассировки стека:

java.lang.OutOfMemoryError: Java heap space        
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)        
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)        
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)        
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)        
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)        
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:467)        
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)        
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
        at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

1 Ответ

1 голос
/ 16 апреля 2019

Если OutOfMemoryError указывает небольшой размер кучи или нет, если да, как ограничить скорость, если нет, что это значит?

Да, кучи недостаточновыделить всю память, необходимую приложению для работы.Мы не видим этого очень часто, и оператор подавления является новым, поэтому я с подозрением отношусь к этому, но хорошо иметь в виду, что в принципе любая структура данных в вашем приложении может быть ответственной.

Лучший способ диагностировать нехватку памяти - сделать «дамп кучи».Это в основном копирует всю память вашей JVM в файл, так что вы можете анализировать его содержимое с помощью программы, такой как https://www.eclipse.org/mat/.Это будет немного кривой обучения, но я думаю, вы обнаружите, что какое-то средство с анализом использования памяти в целом очень удобно.

Вы можете запустить дамп кучи в любое время (естьнесколько способов сделать это, вам придется исследовать лучший способ для вас).Но я думаю, что вы захотите использовать изящную опцию Java, чтобы сделать дамп кучи, когда он получает ошибку нехватки памяти.Таким образом, вы с большей вероятностью определите виновника.См. https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr или аналогичный для вашей JVM.

Я могу предположить причину сброса кучи, но я боюсь, что могу просто сбить вас с толку и потратить ваше время.Как только у вас появятся результаты дампа, я думаю, вам следует открыть отчет об ошибках в трекере проблем Кафки: https://issues.apache.org/jira/projects/KAFKA.Затем мы можем помочь выяснить, как обойти эту ошибку, чтобы она снова заработала, а также как ее исправить в будущих выпусках.

На самом деле, я предложу одно предположение ... Возможно, вы 'Вы видите результат этой ошибки: https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895). Если ваш OOME пропадает, когда вы удаляете оператор подавления, вы можете оставить его на время. Как только мы объединяемИсправьте, я буду запрашивать выпуск исправления, и вы можете попробовать еще раз, чтобы увидеть, решена ли проблема.

Ключ для KTABLE-SUPPRESS-STATE-STORE определяется тем, каким API, каким образом илиДолжен ли я контролировать это?

К счастью, у этого есть более простой ответ. Ключ, на который вы смотрите - это бинарная версия ключа записи и метка времени окна. Этот ключявляется результатом использования windowBy. В Java вы можете видеть, что результатом агрегирования является KTable<Windowed<String>, ...> и что Suppress не меняет тип ключа или значения. Другими словами, вы смотрите насериализованная версияна ключ (Windowed<String>).

Оставляя подавление в стороне на секунду;Допустим, у вас есть два серийных номера, «asdf» и «zxcv».Допустим, ваш размер окна составляет один час.Ваше приложение группирует события для каждого этих серийных номеров (независимо) в каждый час дня.Таким образом, существует агрегация для всех записей «asdf» с 9:00 до 10:00, а также есть одна для всех записей «zxcv» с 9:00 до 10:00.Таким образом, общее количество ключей в оконной таблице KTable составляет key space x number of windows being retained.

Оператор подавления не влияет на количество ключей в таблице KTable.Его целью является подавление обновлений для этих клавиш в течение определенного времени (timeToWait).Например, без подавления, если вы получите 3 обновления записи «asdf» в период с 9:00 до 10:00, оконная агрегация будет выдавать обновленный результат для (asdf, 9:00) каждый раз, поэтому для 3 событий в, вы видите 3Результаты обновления выходят.Оператор Suppress просто запрещает эти обновления результатов до тех пор, пока не пройдет timeToWait, а когда он пройдет, он выдаст только самое последнее обновление.

Таким образом, количество ключей в буфере подавления в любое время меньшечем общее количество ключей в восходящем KTable.Он просто содержит ключи, которые были обновлены за последний timeToWait период времени.

Помогает ли это?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...