Что это значит, когда вы получаете «слишком много открытых файлов» на Kafka Consumer / Client? - PullRequest
0 голосов
/ 20 февраля 2020

Я знаю, что это обычно означает, что нужно увеличить предел. Но что это на самом деле означает, когда это происходит на стороне потребителя?

Я использую Apache Flink, и я получил эту ошибку на своем узле задачи Flink. Когда я перезагружал свой узел Flink и перераспределял работу, она работала нормально. Брокеры тоже казались хорошими в то время.

У меня всего 9 задач, выполняемых на 3 узлах. Максимальный параллелизм для любого одного задания составляет от 1 до 2. Итак, давайте предположим, что наихудший случай 18 параллелизма / потоков на 3 узлах.

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
... 14 more

1 Ответ

1 голос
/ 20 февраля 2020

Каждый клиент Kafka (производитель, потребитель) поддерживает один сокет для каждого брокера в кластере, к которому он подключен (наихудший случай).

, поэтому при просмотре количества миганий клиентов в разы увеличивается количество брокеров в вашем кластер

сокеты считаются дескрипторами для целей ulimit.

Я не знаю, сколько клиентов kafka создает flink внутри - вы можете получить дамп кучи и посмотреть, сколько там клиентских объектов

...